Block processing cleanup (#4153)

* Implements Ord for BlobSidecar based on index

* Use BTreeMap for gossip cache to maintain blob order by index

* fmt

* Another panic fix
This commit is contained in:
Pawan Dhananjay
2023-04-03 15:07:11 +05:30
committed by GitHub
parent deec9c51ba
commit ffefd20137
5 changed files with 132 additions and 49 deletions

View File

@@ -11,7 +11,7 @@ use slot_clock::SlotClock;
use ssz_types::{Error, VariableList};
use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions;
use std::collections::hash_map::{Entry, OccupiedEntry};
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use types::beacon_block_body::KzgCommitments;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar};
@@ -54,37 +54,51 @@ impl From<ssz_types::Error> for AvailabilityCheckError {
/// - blocks that have been fully verified and only require a data availability check
pub struct DataAvailabilityChecker<T: EthSpec, S: SlotClock> {
rpc_blob_cache: RwLock<HashMap<BlobIdentifier, Arc<BlobSidecar<T>>>>,
gossip_blob_cache: Mutex<HashMap<Hash256, GossipBlobCache<T>>>,
gossip_availability_cache: Mutex<HashMap<Hash256, GossipAvailabilityCache<T>>>,
slot_clock: S,
kzg: Option<Arc<Kzg>>,
spec: ChainSpec,
}
struct GossipBlobCache<T: EthSpec> {
verified_blobs: Vec<KzgVerifiedBlob<T>>,
/// Caches partially available blobs and execution verified blocks corresponding
/// to a given `block_hash` that are received over gossip.
///
/// The blobs are all gossip and kzg verified.
/// The block has completed all verifications except the availability check.
struct GossipAvailabilityCache<T: EthSpec> {
/// We use a `BTreeMap` here to maintain the order of `BlobSidecar`s based on index.
verified_blobs: BTreeMap<u64, KzgVerifiedBlob<T>>,
executed_block: Option<AvailabilityPendingExecutedBlock<T>>,
}
impl<T: EthSpec> GossipBlobCache<T> {
impl<T: EthSpec> GossipAvailabilityCache<T> {
fn new_from_blob(blob: KzgVerifiedBlob<T>) -> Self {
let mut verified_blobs = BTreeMap::new();
verified_blobs.insert(blob.blob_index(), blob);
Self {
verified_blobs: vec![blob],
verified_blobs,
executed_block: None,
}
}
fn new_from_block(block: AvailabilityPendingExecutedBlock<T>) -> Self {
Self {
verified_blobs: vec![],
verified_blobs: BTreeMap::new(),
executed_block: Some(block),
}
}
/// Returns `true` if the cache has all blobs corresponding to the
/// kzg commitments in the block.
fn has_all_blobs(&self, block: &AvailabilityPendingExecutedBlock<T>) -> bool {
self.verified_blobs.len() == block.num_blobs_expected()
}
}
/// This type is returned after adding a block / blob to the `DataAvailabilityChecker`.
///
/// Indicates if the block is fully `Available` or if we need blobs or blocks
/// to "complete" the requirements for an `AvailableBlock`.
pub enum Availability<T: EthSpec> {
PendingBlobs(Vec<BlobIdentifier>),
PendingBlock(Hash256),
@@ -92,6 +106,8 @@ pub enum Availability<T: EthSpec> {
}
impl<T: EthSpec> Availability<T> {
/// Returns all the blob identifiers associated with an `AvailableBlock`.
/// Returns `None` if avaiability hasn't been fully satisfied yet.
pub fn get_available_blob_ids(&self) -> Option<Vec<BlobIdentifier>> {
if let Self::Available(block) = self {
Some(block.get_all_blob_ids())
@@ -105,7 +121,7 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
pub fn new(slot_clock: S, kzg: Option<Arc<Kzg>>, spec: ChainSpec) -> Self {
Self {
rpc_blob_cache: <_>::default(),
gossip_blob_cache: <_>::default(),
gossip_availability_cache: <_>::default(),
slot_clock,
kzg,
spec,
@@ -117,9 +133,9 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
self.rpc_blob_cache.read().get(blob_id).cloned()
}
/// This first validate the KZG commitments included in the blob sidecar.
/// This first validates the KZG commitments included in the blob sidecar.
/// Check if we've cached other blobs for this block. If it completes a set and we also
/// have a block cached, return the Availability variant triggering block import.
/// have a block cached, return the `Availability` variant triggering block import.
/// Otherwise cache the blob sidecar.
///
/// This should only accept gossip verified blobs, so we should not have to worry about dupes.
@@ -138,7 +154,7 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
let blob = kzg_verified_blob.clone_blob();
let mut blob_cache = self.gossip_blob_cache.lock();
let mut blob_cache = self.gossip_availability_cache.lock();
// Gossip cache.
let availability = match blob_cache.entry(blob.block_root) {
@@ -149,7 +165,7 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
cache
.verified_blobs
.insert(blob.index as usize, kzg_verified_blob);
.insert(kzg_verified_blob.blob_index(), kzg_verified_blob);
if let Some(executed_block) = cache.executed_block.take() {
self.check_block_availability_maybe_cache(occupied_entry, executed_block)?
@@ -159,7 +175,7 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
}
Entry::Vacant(vacant_entry) => {
let block_root = kzg_verified_blob.block_root();
vacant_entry.insert(GossipBlobCache::new_from_blob(kzg_verified_blob));
vacant_entry.insert(GossipAvailabilityCache::new_from_blob(kzg_verified_blob));
Availability::PendingBlock(block_root)
}
};
@@ -181,7 +197,7 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
&self,
executed_block: AvailabilityPendingExecutedBlock<T>,
) -> Result<Availability<T>, AvailabilityCheckError> {
let mut guard = self.gossip_blob_cache.lock();
let mut guard = self.gossip_availability_cache.lock();
let entry = guard.entry(executed_block.import_data.block_root);
let availability = match entry {
@@ -190,7 +206,7 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
}
Entry::Vacant(vacant_entry) => {
let all_blob_ids = executed_block.get_all_blob_ids();
vacant_entry.insert(GossipBlobCache::new_from_block(executed_block));
vacant_entry.insert(GossipAvailabilityCache::new_from_block(executed_block));
Availability::PendingBlobs(all_blob_ids)
}
};
@@ -204,9 +220,16 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
Ok(availability)
}
/// Checks if the provided `executed_block` contains all required blobs to be considered an
/// `AvailableBlock` based on blobs that are cached.
///
/// Returns an error if there was an error when matching the block commitments against blob commitments.
///
/// Returns `Ok(Availability::Available(_))` if all blobs for the block are present in cache.
/// Returns `Ok(Availability::PendingBlobs(_))` if all corresponding blobs have not been received in the cache.
fn check_block_availability_maybe_cache(
&self,
mut occupied_entry: OccupiedEntry<Hash256, GossipBlobCache<T>>,
mut occupied_entry: OccupiedEntry<Hash256, GossipAvailabilityCache<T>>,
executed_block: AvailabilityPendingExecutedBlock<T>,
) -> Result<Availability<T>, AvailabilityCheckError> {
if occupied_entry.get().has_all_blobs(&executed_block) {
@@ -216,9 +239,13 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
payload_verification_outcome,
} = executed_block;
let cache = occupied_entry.remove();
let GossipAvailabilityCache {
verified_blobs,
executed_block: _,
} = occupied_entry.remove();
let verified_blobs = verified_blobs.into_values().collect();
let available_block = self.make_available(block, cache.verified_blobs)?;
let available_block = self.make_available(block, verified_blobs)?;
Ok(Availability::Available(Box::new(
AvailableExecutedBlock::new(
available_block,
@@ -227,18 +254,18 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
),
)))
} else {
let cache = occupied_entry.get_mut();
let cached_entry = occupied_entry.get_mut();
let missing_blob_ids = executed_block
.get_filtered_blob_ids(|index| cache.verified_blobs.get(index).is_none());
.get_filtered_blob_ids(|index| cached_entry.verified_blobs.get(&index).is_none());
let _ = cache.executed_block.insert(executed_block);
let _ = cached_entry.executed_block.insert(executed_block);
Ok(Availability::PendingBlobs(missing_blob_ids))
}
}
/// Checks if a block is available, returns a MaybeAvailableBlock enum that may include the fully
/// Checks if a block is available, returns a `MaybeAvailableBlock` that may include the fully
/// available block.
pub fn check_availability(
&self,
@@ -321,11 +348,11 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
/// Verifies an AvailabilityPendingBlock against a set of KZG verified blobs.
/// This does not check whether a block *should* have blobs, these checks should must have been
/// completed when producing the AvailabilityPendingBlock.
/// completed when producing the `AvailabilityPendingBlock`.
pub fn make_available(
&self,
block: AvailabilityPendingBlock<T>,
blobs: KzgVerifiedBlobList<T>,
blobs: Vec<KzgVerifiedBlob<T>>,
) -> Result<AvailableBlock<T>, AvailabilityCheckError> {
let block_kzg_commitments = block.kzg_commitments()?;
if blobs.len() != block_kzg_commitments.len() {
@@ -428,6 +455,12 @@ pub enum BlobRequirements {
PreDeneb,
}
/// A wrapper over a `SignedBeaconBlock` where we have not verified availability of
/// corresponding `BlobSidecar`s and hence, is not ready for import into fork choice.
///
/// Note: This wrapper does not necessarily correspond to a pre-deneb block as a pre-deneb
/// block that is ready for import will be of type `AvailableBlock` with its `blobs` field
/// set to `VerifiedBlobs::PreDeneb`.
#[derive(Clone, Debug, PartialEq)]
pub struct AvailabilityPendingBlock<E: EthSpec> {
block: Arc<SignedBeaconBlock<E>>,
@@ -452,6 +485,20 @@ impl<E: EthSpec> AvailabilityPendingBlock<E> {
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum VerifiedBlobs<E: EthSpec> {
/// These blobs are available.
Available(BlobSidecarList<E>),
/// This block is from outside the data availability boundary so doesn't require
/// a data availability check.
NotRequired,
/// The block's `kzg_commitments` field is empty so it does not contain any blobs.
EmptyBlobs,
/// This is a block prior to the 4844 fork, so doesn't require any blobs
PreDeneb,
}
/// A fully available block that is ready to be imported into fork choice.
#[derive(Clone, Debug, PartialEq)]
pub struct AvailableBlock<E: EthSpec> {
block: Arc<SignedBeaconBlock<E>>,
@@ -473,19 +520,6 @@ impl<E: EthSpec> AvailableBlock<E> {
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum VerifiedBlobs<E: EthSpec> {
/// These blobs are available.
Available(BlobSidecarList<E>),
/// This block is from outside the data availability boundary so doesn't require
/// a data availability check.
NotRequired,
/// The block's `kzg_commitments` field is empty so it does not contain any blobs.
EmptyBlobs,
/// This is a block prior to the 4844 fork, so doesn't require any blobs
PreDeneb,
}
impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
fn slot(&self) -> Slot {
self.block.slot()