mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-19 13:58:28 +00:00
Merge branch 'deneb-free-blobs' of https://github.com/sigp/lighthouse into some-blob-reprocessing-work
This commit is contained in:
@@ -6,9 +6,9 @@ use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecu
|
||||
|
||||
use kzg::Error as KzgError;
|
||||
use kzg::Kzg;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use parking_lot::RwLock;
|
||||
use slot_clock::SlotClock;
|
||||
use ssz_types::{Error, VariableList};
|
||||
use ssz_types::{Error, FixedVector, VariableList};
|
||||
use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions;
|
||||
use std::collections::hash_map::{Entry, OccupiedEntry};
|
||||
use std::collections::HashMap;
|
||||
@@ -53,23 +53,34 @@ impl From<ssz_types::Error> for AvailabilityCheckError {
|
||||
/// have not been verified against blobs
|
||||
/// - blocks that have been fully verified and only require a data availability check
|
||||
pub struct DataAvailabilityChecker<T: EthSpec, S: SlotClock> {
|
||||
rpc_blob_cache: RwLock<HashMap<BlobIdentifier, Arc<BlobSidecar<T>>>>,
|
||||
gossip_blob_cache: Mutex<HashMap<Hash256, GossipBlobCache<T>>>,
|
||||
availability_cache: RwLock<HashMap<Hash256, ReceivedComponents<T>>>,
|
||||
slot_clock: S,
|
||||
kzg: Option<Arc<Kzg>>,
|
||||
spec: ChainSpec,
|
||||
}
|
||||
|
||||
struct GossipBlobCache<T: EthSpec> {
|
||||
verified_blobs: Vec<KzgVerifiedBlob<T>>,
|
||||
/// Caches partially available blobs and execution verified blocks corresponding
|
||||
/// to a given `block_hash` that are received over gossip.
|
||||
///
|
||||
/// The blobs are all gossip and kzg verified.
|
||||
/// The block has completed all verifications except the availability check.
|
||||
struct ReceivedComponents<T: EthSpec> {
|
||||
/// We use a `BTreeMap` here to maintain the order of `BlobSidecar`s based on index.
|
||||
verified_blobs: FixedVector<Option<KzgVerifiedBlob<T>>, T::MaxBlobsPerBlock>,
|
||||
executed_block: Option<AvailabilityPendingExecutedBlock<T>>,
|
||||
missing_blob_ids: Vec<BlobIdentifier>,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> GossipBlobCache<T> {
|
||||
impl<T: EthSpec> ReceivedComponents<T> {
|
||||
fn new_from_blob(blob: KzgVerifiedBlob<T>) -> Self {
|
||||
let mut verified_blobs = FixedVector::<_, _>::default();
|
||||
// TODO: verify that we've already ensured the blob index < T::MaxBlobsPerBlock
|
||||
if let Some(mut_maybe_blob) = verified_blobs.get_mut(blob.blob_index() as usize) {
|
||||
*mut_maybe_blob = Some(blob);
|
||||
}
|
||||
|
||||
Self {
|
||||
verified_blobs: vec![blob],
|
||||
verified_blobs,
|
||||
executed_block: None,
|
||||
missing_blob_ids: vec![],
|
||||
}
|
||||
@@ -78,17 +89,33 @@ impl<T: EthSpec> GossipBlobCache<T> {
|
||||
fn new_from_block(block: AvailabilityPendingExecutedBlock<T>) -> Self {
|
||||
let missing_blob_ids = block.get_all_blob_ids();
|
||||
Self {
|
||||
verified_blobs: vec![],
|
||||
verified_blobs: <_>::default(),
|
||||
executed_block: Some(block),
|
||||
missing_blob_ids,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `true` if the cache has all blobs corresponding to the
|
||||
/// kzg commitments in the block.
|
||||
fn has_all_blobs(&self, block: &AvailabilityPendingExecutedBlock<T>) -> bool {
|
||||
self.verified_blobs.len() == block.num_blobs_expected()
|
||||
for i in 0..block.num_blobs_expected() {
|
||||
if self
|
||||
.verified_blobs
|
||||
.get(i)
|
||||
.map(|maybe_blob| maybe_blob.is_none())
|
||||
.unwrap_or(true)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// This type is returned after adding a block / blob to the `DataAvailabilityChecker`.
|
||||
///
|
||||
/// Indicates if the block is fully `Available` or if we need blobs or blocks
|
||||
/// to "complete" the requirements for an `AvailableBlock`.
|
||||
pub enum Availability<T: EthSpec> {
|
||||
PendingBlobs(Vec<BlobIdentifier>),
|
||||
PendingBlock(Hash256),
|
||||
@@ -96,6 +123,8 @@ pub enum Availability<T: EthSpec> {
|
||||
}
|
||||
|
||||
impl<T: EthSpec> Availability<T> {
|
||||
/// Returns all the blob identifiers associated with an `AvailableBlock`.
|
||||
/// Returns `None` if avaiability hasn't been fully satisfied yet.
|
||||
pub fn get_available_blob_ids(&self) -> Option<Vec<BlobIdentifier>> {
|
||||
if let Self::Available(block) = self {
|
||||
Some(block.get_all_blob_ids())
|
||||
@@ -108,17 +137,22 @@ impl<T: EthSpec> Availability<T> {
|
||||
impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
||||
pub fn new(slot_clock: S, kzg: Option<Arc<Kzg>>, spec: ChainSpec) -> Self {
|
||||
Self {
|
||||
rpc_blob_cache: <_>::default(),
|
||||
gossip_blob_cache: <_>::default(),
|
||||
availability_cache: <_>::default(),
|
||||
slot_clock,
|
||||
kzg,
|
||||
spec,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a blob from the RPC cache.
|
||||
/// Get a blob from the availability cache.
|
||||
pub fn get_blob(&self, blob_id: &BlobIdentifier) -> Option<Arc<BlobSidecar<T>>> {
|
||||
self.rpc_blob_cache.read().get(blob_id).cloned()
|
||||
self.availability_cache
|
||||
.read()
|
||||
.get(&blob_id.block_root)?
|
||||
.verified_blobs
|
||||
.get(blob_id.index as usize)?
|
||||
.as_ref()
|
||||
.map(|kzg_verified_blob| kzg_verified_blob.clone_blob())
|
||||
}
|
||||
|
||||
pub fn put_rpc_blob(
|
||||
@@ -137,9 +171,9 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
||||
})
|
||||
}
|
||||
|
||||
/// This first validate the KZG commitments included in the blob sidecar.
|
||||
/// This first validates the KZG commitments included in the blob sidecar.
|
||||
/// Check if we've cached other blobs for this block. If it completes a set and we also
|
||||
/// have a block cached, return the Availability variant triggering block import.
|
||||
/// have a block cached, return the `Availability` variant triggering block import.
|
||||
/// Otherwise cache the blob sidecar.
|
||||
///
|
||||
/// This should only accept gossip verified blobs, so we should not have to worry about dupes.
|
||||
@@ -154,34 +188,24 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
||||
return Err(AvailabilityCheckError::KzgNotInitialized);
|
||||
};
|
||||
|
||||
self.put_kzg_verified_blob(kzg_verified_blob, |_, _| true)
|
||||
}
|
||||
|
||||
fn put_kzg_verified_blob(
|
||||
&self,
|
||||
kzg_verified_blob: KzgVerifiedBlob<T>,
|
||||
predicate: impl FnOnce(BlobIdentifier, &[BlobIdentifier]) -> bool,
|
||||
) -> Result<Availability<T>, AvailabilityCheckError> {
|
||||
let blob = kzg_verified_blob.clone_blob();
|
||||
let blob_id = blob.id();
|
||||
let mut blob_cache = self.gossip_blob_cache.lock();
|
||||
|
||||
// Gossip cache.
|
||||
let availability = match blob_cache.entry(blob.block_root) {
|
||||
let availability = match self
|
||||
.availability_cache
|
||||
.write()
|
||||
.entry(kzg_verified_blob.block_root())
|
||||
{
|
||||
Entry::Occupied(mut occupied_entry) => {
|
||||
// All blobs reaching this cache should be gossip verified and gossip verification
|
||||
// should filter duplicates, as well as validate indices.
|
||||
let cache = occupied_entry.get_mut();
|
||||
let received_components = occupied_entry.get_mut();
|
||||
|
||||
if !predicate(blob_id, cache.missing_blob_ids.as_slice()) {
|
||||
// ignore this blob
|
||||
if let Some(maybe_verified_blob) = received_components
|
||||
.verified_blobs
|
||||
.get_mut(kzg_verified_blob.blob_index() as usize)
|
||||
{
|
||||
*maybe_verified_blob = Some(kzg_verified_blob)
|
||||
}
|
||||
|
||||
cache
|
||||
.verified_blobs
|
||||
.insert(blob.index as usize, kzg_verified_blob);
|
||||
|
||||
if let Some(executed_block) = cache.executed_block.take() {
|
||||
if let Some(executed_block) = received_components.executed_block.take() {
|
||||
self.check_block_availability_maybe_cache(occupied_entry, executed_block)?
|
||||
} else {
|
||||
Availability::PendingBlock(blob.block_root)
|
||||
@@ -189,18 +213,11 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
||||
}
|
||||
Entry::Vacant(vacant_entry) => {
|
||||
let block_root = kzg_verified_blob.block_root();
|
||||
vacant_entry.insert(GossipBlobCache::new_from_blob(kzg_verified_blob));
|
||||
vacant_entry.insert(ReceivedComponents::new_from_blob(kzg_verified_blob));
|
||||
Availability::PendingBlock(block_root)
|
||||
}
|
||||
};
|
||||
|
||||
drop(blob_cache);
|
||||
|
||||
if let Some(blob_ids) = availability.get_available_blob_ids() {
|
||||
self.prune_rpc_blob_cache(&blob_ids);
|
||||
} else {
|
||||
self.rpc_blob_cache.write().insert(blob_id, blob);
|
||||
}
|
||||
Ok(availability)
|
||||
}
|
||||
|
||||
@@ -210,44 +227,56 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
||||
&self,
|
||||
executed_block: AvailabilityPendingExecutedBlock<T>,
|
||||
) -> Result<Availability<T>, AvailabilityCheckError> {
|
||||
let mut guard = self.gossip_blob_cache.lock();
|
||||
let entry = guard.entry(executed_block.import_data.block_root);
|
||||
|
||||
let availability = match entry {
|
||||
let availability = match self
|
||||
.availability_cache
|
||||
.write()
|
||||
.entry(executed_block.import_data.block_root)
|
||||
{
|
||||
Entry::Occupied(occupied_entry) => {
|
||||
self.check_block_availability_maybe_cache(occupied_entry, executed_block)?
|
||||
}
|
||||
Entry::Vacant(vacant_entry) => {
|
||||
let all_blob_ids = executed_block.get_all_blob_ids();
|
||||
vacant_entry.insert(GossipBlobCache::new_from_block(executed_block));
|
||||
vacant_entry.insert(ReceivedComponents::new_from_block(executed_block));
|
||||
Availability::PendingBlobs(all_blob_ids)
|
||||
}
|
||||
};
|
||||
|
||||
drop(guard);
|
||||
|
||||
if let Some(blob_ids) = availability.get_available_blob_ids() {
|
||||
self.prune_rpc_blob_cache(&blob_ids);
|
||||
}
|
||||
|
||||
Ok(availability)
|
||||
}
|
||||
|
||||
/// Checks if the provided `executed_block` contains all required blobs to be considered an
|
||||
/// `AvailableBlock` based on blobs that are cached.
|
||||
///
|
||||
/// Returns an error if there was an error when matching the block commitments against blob commitments.
|
||||
///
|
||||
/// Returns `Ok(Availability::Available(_))` if all blobs for the block are present in cache.
|
||||
/// Returns `Ok(Availability::PendingBlobs(_))` if all corresponding blobs have not been received in the cache.
|
||||
fn check_block_availability_maybe_cache(
|
||||
&self,
|
||||
mut occupied_entry: OccupiedEntry<Hash256, GossipBlobCache<T>>,
|
||||
mut occupied_entry: OccupiedEntry<Hash256, ReceivedComponents<T>>,
|
||||
executed_block: AvailabilityPendingExecutedBlock<T>,
|
||||
) -> Result<Availability<T>, AvailabilityCheckError> {
|
||||
if occupied_entry.get().has_all_blobs(&executed_block) {
|
||||
let num_blobs_expected = executed_block.num_blobs_expected();
|
||||
let AvailabilityPendingExecutedBlock {
|
||||
block,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
} = executed_block;
|
||||
|
||||
let cache = occupied_entry.remove();
|
||||
let ReceivedComponents {
|
||||
verified_blobs,
|
||||
executed_block: _,
|
||||
} = occupied_entry.remove();
|
||||
|
||||
let available_block = self.make_available(block, cache.verified_blobs)?;
|
||||
let verified_blobs = Vec::from(verified_blobs)
|
||||
.into_iter()
|
||||
.take(num_blobs_expected)
|
||||
.map(|maybe_blob| maybe_blob.ok_or(AvailabilityCheckError::MissingBlobs))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let available_block = self.make_available(block, verified_blobs)?;
|
||||
Ok(Availability::Available(Box::new(
|
||||
AvailableExecutedBlock::new(
|
||||
available_block,
|
||||
@@ -256,19 +285,23 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
||||
),
|
||||
)))
|
||||
} else {
|
||||
let cache = occupied_entry.get_mut();
|
||||
let received_components = occupied_entry.get_mut();
|
||||
|
||||
let missing_blob_ids = executed_block
|
||||
.get_filtered_blob_ids(|index| cache.verified_blobs.get(index).is_none());
|
||||
let missing_blob_ids = executed_block.get_filtered_blob_ids(|index| {
|
||||
received_components
|
||||
.verified_blobs
|
||||
.get(index as usize)
|
||||
.map(|maybe_blob| maybe_blob.is_none())
|
||||
.unwrap_or(true)
|
||||
});
|
||||
|
||||
let _ = cache.executed_block.insert(executed_block);
|
||||
cache.missing_blob_ids = missing_blob_ids.clone();
|
||||
let _ = received_components.executed_block.insert(executed_block);
|
||||
|
||||
Ok(Availability::PendingBlobs(missing_blob_ids))
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if a block is available, returns a MaybeAvailableBlock enum that may include the fully
|
||||
/// Checks if a block is available, returns a `MaybeAvailableBlock` that may include the fully
|
||||
/// available block.
|
||||
pub fn check_availability(
|
||||
&self,
|
||||
@@ -351,11 +384,11 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
||||
|
||||
/// Verifies an AvailabilityPendingBlock against a set of KZG verified blobs.
|
||||
/// This does not check whether a block *should* have blobs, these checks should must have been
|
||||
/// completed when producing the AvailabilityPendingBlock.
|
||||
/// completed when producing the `AvailabilityPendingBlock`.
|
||||
pub fn make_available(
|
||||
&self,
|
||||
block: AvailabilityPendingBlock<T>,
|
||||
blobs: KzgVerifiedBlobList<T>,
|
||||
blobs: Vec<KzgVerifiedBlob<T>>,
|
||||
) -> Result<AvailableBlock<T>, AvailabilityCheckError> {
|
||||
let block_kzg_commitments = block.kzg_commitments()?;
|
||||
if blobs.len() != block_kzg_commitments.len() {
|
||||
@@ -438,13 +471,6 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
||||
self.data_availability_boundary()
|
||||
.map_or(false, |da_epoch| block_epoch >= da_epoch)
|
||||
}
|
||||
|
||||
pub fn prune_rpc_blob_cache(&self, blob_ids: &[BlobIdentifier]) {
|
||||
let mut guard = self.rpc_blob_cache.write();
|
||||
for id in blob_ids {
|
||||
guard.remove(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum BlobRequirements {
|
||||
@@ -458,6 +484,12 @@ pub enum BlobRequirements {
|
||||
PreDeneb,
|
||||
}
|
||||
|
||||
/// A wrapper over a `SignedBeaconBlock` where we have not verified availability of
|
||||
/// corresponding `BlobSidecar`s and hence, is not ready for import into fork choice.
|
||||
///
|
||||
/// Note: This wrapper does not necessarily correspond to a pre-deneb block as a pre-deneb
|
||||
/// block that is ready for import will be of type `AvailableBlock` with its `blobs` field
|
||||
/// set to `VerifiedBlobs::PreDeneb`.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct AvailabilityPendingBlock<E: EthSpec> {
|
||||
block: Arc<SignedBeaconBlock<E>>,
|
||||
@@ -482,6 +514,20 @@ impl<E: EthSpec> AvailabilityPendingBlock<E> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum VerifiedBlobs<E: EthSpec> {
|
||||
/// These blobs are available.
|
||||
Available(BlobSidecarList<E>),
|
||||
/// This block is from outside the data availability boundary so doesn't require
|
||||
/// a data availability check.
|
||||
NotRequired,
|
||||
/// The block's `kzg_commitments` field is empty so it does not contain any blobs.
|
||||
EmptyBlobs,
|
||||
/// This is a block prior to the 4844 fork, so doesn't require any blobs
|
||||
PreDeneb,
|
||||
}
|
||||
|
||||
/// A fully available block that is ready to be imported into fork choice.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct AvailableBlock<E: EthSpec> {
|
||||
block: Arc<SignedBeaconBlock<E>>,
|
||||
@@ -503,19 +549,6 @@ impl<E: EthSpec> AvailableBlock<E> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum VerifiedBlobs<E: EthSpec> {
|
||||
/// These blobs are available.
|
||||
Available(BlobSidecarList<E>),
|
||||
/// This block is from outside the data availability boundary so doesn't require
|
||||
/// a data availability check.
|
||||
NotRequired,
|
||||
/// The block's `kzg_commitments` field is empty so it does not contain any blobs.
|
||||
EmptyBlobs,
|
||||
/// This is a block prior to the 4844 fork, so doesn't require any blobs
|
||||
PreDeneb,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
|
||||
fn slot(&self) -> Slot {
|
||||
self.block.slot()
|
||||
|
||||
Reference in New Issue
Block a user