compile after merge

This commit is contained in:
realbigsean
2023-05-18 16:42:13 -04:00
parent 4dd1184968
commit bef63e42f7
8 changed files with 150 additions and 327 deletions

View File

@@ -511,9 +511,7 @@ impl<T: EthSpec> TryInto<AvailableBlock<T>> for MaybeAvailableBlock<T> {
fn try_into(self) -> Result<AvailableBlock<T>, Self::Error> { fn try_into(self) -> Result<AvailableBlock<T>, Self::Error> {
match self { match self {
Self::Available(block) => Ok(block), Self::Available(block) => Ok(block),
Self::AvailabilityPending(block) => Err(AvailabilityCheckError::MissingBlobs( Self::AvailabilityPending(block) => Err(AvailabilityCheckError::MissingBlobs),
block.as_block().canonical_root(),
)),
} }
} }
} }

View File

@@ -861,12 +861,10 @@ where
slasher: self.slasher.clone(), slasher: self.slasher.clone(),
validator_monitor: RwLock::new(validator_monitor), validator_monitor: RwLock::new(validator_monitor),
//TODO(sean) should we move kzg solely to the da checker? //TODO(sean) should we move kzg solely to the da checker?
data_availability_checker: Arc::new(DataAvailabilityChecker::new( data_availability_checker: Arc::new(
slot_clock, DataAvailabilityChecker::new(slot_clock, kzg.clone(), store, self.spec)
kzg.clone(), .map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?,
store, ),
self.spec,
).map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?),
proposal_blob_cache: BlobCache::default(), proposal_blob_cache: BlobCache::default(),
kzg, kzg,
}; };

View File

@@ -10,13 +10,12 @@ use kzg::Error as KzgError;
use kzg::Kzg; use kzg::Kzg;
use slog::{debug, error}; use slog::{debug, error};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use ssz_types::{Error, VariableList}; use ssz_types::{Error, FixedVector, VariableList};
use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions; use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions;
use std::collections::hash_map::{Entry, OccupiedEntry}; use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
use task_executor::TaskExecutor;
use strum::IntoStaticStr; use strum::IntoStaticStr;
use task_executor::TaskExecutor;
use types::beacon_block_body::KzgCommitments; use types::beacon_block_body::KzgCommitments;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS; use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS;
@@ -36,7 +35,7 @@ pub enum AvailabilityCheckError {
KzgVerificationFailed, KzgVerificationFailed,
KzgNotInitialized, KzgNotInitialized,
SszTypes(ssz_types::Error), SszTypes(ssz_types::Error),
MissingBlobs(Hash256), MissingBlobs,
NumBlobsMismatch { NumBlobsMismatch {
num_kzg_commitments: usize, num_kzg_commitments: usize,
num_blobs: usize, num_blobs: usize,
@@ -89,56 +88,6 @@ pub struct DataAvailabilityChecker<T: BeaconChainTypes> {
spec: ChainSpec, spec: ChainSpec,
} }
/// Caches partially available blobs and execution verified blocks corresponding
/// to a given `block_hash` that are received over gossip.
///
/// The blobs are all gossip and kzg verified.
/// The block has completed all verifications except the availability check.
struct ReceivedComponents<T: EthSpec> {
verified_blobs: FixedVector<Option<KzgVerifiedBlob<T>>, T::MaxBlobsPerBlock>,
executed_block: Option<AvailabilityPendingExecutedBlock<T>>,
}
impl<T: EthSpec> ReceivedComponents<T> {
fn new_from_blobs(blobs: &[KzgVerifiedBlob<T>]) -> Self {
let mut verified_blobs = FixedVector::<_, _>::default();
for blob in blobs {
// TODO: verify that we've already ensured the blob index < T::MaxBlobsPerBlock
if let Some(mut_maybe_blob) = verified_blobs.get_mut(blob.blob_index() as usize) {
*mut_maybe_blob = Some(blob.clone());
}
}
Self {
verified_blobs,
executed_block: None,
}
}
fn new_from_block(block: AvailabilityPendingExecutedBlock<T>) -> Self {
Self {
verified_blobs: <_>::default(),
executed_block: Some(block),
}
}
/// Returns `true` if the cache has all blobs corresponding to the
/// kzg commitments in the block.
fn has_all_blobs(&self, block: &AvailabilityPendingExecutedBlock<T>) -> bool {
for i in 0..block.num_blobs_expected() {
if self
.verified_blobs
.get(i)
.map(|maybe_blob| maybe_blob.is_none())
.unwrap_or(true)
{
return false;
}
}
true
}
}
/// This type is returned after adding a block / blob to the `DataAvailabilityChecker`. /// This type is returned after adding a block / blob to the `DataAvailabilityChecker`.
/// ///
/// Indicates if the block is fully `Available` or if we need blobs or blocks /// Indicates if the block is fully `Available` or if we need blobs or blocks
@@ -178,34 +127,17 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
} }
pub fn has_block(&self, block_root: &Hash256) -> bool { pub fn has_block(&self, block_root: &Hash256) -> bool {
self.availability_cache self.availability_cache.has_block(block_root)
.read()
.get(block_root)
.map_or(false, |cache| cache.executed_block.is_some())
} }
pub fn get_missing_blob_ids_checking_cache( pub fn get_missing_blob_ids_checking_cache(
&self, &self,
block_root: Hash256, block_root: Hash256,
) -> Option<Vec<BlobIdentifier>> { ) -> Option<Vec<BlobIdentifier>> {
let guard = self.availability_cache.read(); let (block, blob_indices) = self
let (block, blob_indices) = guard .availability_cache
.get(&block_root) .get_missing_blob_ids_checking_cache(block_root);
.map(|cache| { self.get_missing_blob_ids(block_root, block.as_ref(), Some(blob_indices))
let block_opt = cache
.executed_block
.as_ref()
.map(|block| &block.block.block);
let blobs = cache
.verified_blobs
.iter()
.enumerate()
.filter_map(|(i, maybe_blob)| maybe_blob.as_ref().map(|_| i))
.collect::<HashSet<_>>();
(block_opt, blobs)
})
.unwrap_or_default();
self.get_missing_blob_ids(block_root, block, Some(blob_indices))
} }
/// A `None` indicates blobs are not required. /// A `None` indicates blobs are not required.
@@ -215,10 +147,10 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn get_missing_blob_ids( pub fn get_missing_blob_ids(
&self, &self,
block_root: Hash256, block_root: Hash256,
block_opt: Option<&Arc<SignedBeaconBlock<T>>>, block_opt: Option<&Arc<SignedBeaconBlock<T::EthSpec>>>,
blobs_opt: Option<HashSet<usize>>, blobs_opt: Option<HashSet<usize>>,
) -> Option<Vec<BlobIdentifier>> { ) -> Option<Vec<BlobIdentifier>> {
let epoch = self.slot_clock.now()?.epoch(T::slots_per_epoch()); let epoch = self.slot_clock.now()?.epoch(T::EthSpec::slots_per_epoch());
self.da_check_required(epoch).then(|| { self.da_check_required(epoch).then(|| {
block_opt block_opt
@@ -228,8 +160,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}) })
}) })
.unwrap_or_else(|| { .unwrap_or_else(|| {
let mut blob_ids = Vec::with_capacity(T::max_blobs_per_block()); let mut blob_ids = Vec::with_capacity(T::EthSpec::max_blobs_per_block());
for i in 0..T::max_blobs_per_block() { for i in 0..T::EthSpec::max_blobs_per_block() {
if blobs_opt.as_ref().map_or(true, |blobs| !blobs.contains(&i)) { if blobs_opt.as_ref().map_or(true, |blobs| !blobs.contains(&i)) {
blob_ids.push(BlobIdentifier { blob_ids.push(BlobIdentifier {
block_root, block_root,
@@ -253,8 +185,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn put_rpc_blobs( pub fn put_rpc_blobs(
&self, &self,
block_root: Hash256, block_root: Hash256,
blobs: FixedBlobSidecarList<T>, blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<Availability<T>, AvailabilityCheckError> { ) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
// TODO(sean) we may duplicated kzg verification on some blobs we already have cached so we could optimize this // TODO(sean) we may duplicated kzg verification on some blobs we already have cached so we could optimize this
let mut verified_blobs = vec![]; let mut verified_blobs = vec![];
@@ -265,8 +197,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
} else { } else {
return Err(AvailabilityCheckError::KzgNotInitialized); return Err(AvailabilityCheckError::KzgNotInitialized);
}; };
self.availability_cache
self.put_kzg_verified_blobs(block_root, &verified_blobs) .put_kzg_verified_blobs(block_root, &verified_blobs)
} }
/// This first validates the KZG commitments included in the blob sidecar. /// This first validates the KZG commitments included in the blob sidecar.
@@ -287,53 +219,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}; };
self.availability_cache self.availability_cache
.put_kzg_verified_blob(kzg_verified_blob) .put_kzg_verified_blobs(kzg_verified_blob.block_root(), &[kzg_verified_blob])
self.put_kzg_verified_blobs(kzg_verified_blob.block_root(), &[kzg_verified_blob])
}
fn put_kzg_verified_blobs(
&self,
block_root: Hash256,
kzg_verified_blobs: &[KzgVerifiedBlob<T>],
) -> Result<Availability<T>, AvailabilityCheckError> {
for blob in kzg_verified_blobs {
let blob_block_root = blob.block_root();
if blob_block_root != block_root {
return Err(AvailabilityCheckError::BlockBlobRootMismatch {
block_root,
blob_block_root,
});
}
}
let availability = match self.availability_cache.write().entry(block_root) {
Entry::Occupied(mut occupied_entry) => {
// All blobs reaching this cache should be gossip verified and gossip verification
// should filter duplicates, as well as validate indices.
let received_components = occupied_entry.get_mut();
for kzg_verified_blob in kzg_verified_blobs {
if let Some(maybe_verified_blob) = received_components
.verified_blobs
.get_mut(kzg_verified_blob.blob_index() as usize)
{
*maybe_verified_blob = Some(kzg_verified_blob.clone())
}
}
if let Some(executed_block) = received_components.executed_block.take() {
self.check_block_availability_maybe_cache(occupied_entry, executed_block)?
} else {
Availability::MissingComponents(block_root)
}
}
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(ReceivedComponents::new_from_blobs(kzg_verified_blobs));
Availability::MissingComponents(block_root)
}
};
Ok(availability)
} }
/// Check if we have all the blobs for a block. If we do, return the Availability variant that /// Check if we have all the blobs for a block. If we do, return the Availability variant that
@@ -346,59 +232,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.put_pending_executed_block(executed_block) .put_pending_executed_block(executed_block)
} }
/// Checks if the provided `executed_block` contains all required blobs to be considered an
/// `AvailableBlock` based on blobs that are cached.
///
/// Returns an error if there was an error when matching the block commitments against blob commitments.
///
/// Returns `Ok(Availability::Available(_))` if all blobs for the block are present in cache.
/// Returns `Ok(Availability::PendingBlobs(_))` if all corresponding blobs have not been received in the cache.
fn check_block_availability_maybe_cache(
&self,
mut occupied_entry: OccupiedEntry<Hash256, ReceivedComponents<T>>,
executed_block: AvailabilityPendingExecutedBlock<T>,
) -> Result<Availability<T>, AvailabilityCheckError> {
if occupied_entry.get().has_all_blobs(&executed_block) {
let num_blobs_expected = executed_block.num_blobs_expected();
let block_root = executed_block.import_data.block_root;
let AvailabilityPendingExecutedBlock {
block,
import_data,
payload_verification_outcome,
} = executed_block;
let ReceivedComponents {
verified_blobs,
executed_block: _,
} = occupied_entry.remove();
let verified_blobs = Vec::from(verified_blobs)
.into_iter()
.take(num_blobs_expected)
.map(|maybe_blob| {
maybe_blob.ok_or(AvailabilityCheckError::MissingBlobs(block_root))
})
.collect::<Result<Vec<_>, _>>()?;
let available_block = self.make_available(block, verified_blobs)?;
Ok(Availability::Available(Box::new(
AvailableExecutedBlock::new(
available_block,
import_data,
payload_verification_outcome,
),
)))
} else {
let received_components = occupied_entry.get_mut();
let block_root = executed_block.import_data.block_root;
let _ = received_components.executed_block.insert(executed_block);
Ok(Availability::MissingComponents(block_root))
}
}
/// Checks if a block is available, returns a `MaybeAvailableBlock` that may include the fully /// Checks if a block is available, returns a `MaybeAvailableBlock` that may include the fully
/// available block. /// available block.
pub fn check_availability( pub fn check_availability(
@@ -422,27 +255,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
} }
} }
/// Checks if a block is available, returning an error if the block is not immediately available.
/// Does not access the gossip cache.
pub fn try_check_availability(
&self,
block: BlockWrapper<T::EthSpec>,
) -> Result<AvailableBlock<T::EthSpec>, AvailabilityCheckError> {
match block {
BlockWrapper::Block(block) => {
let blob_requirements = self.get_blob_requirements(&block)?;
let blobs = match blob_requirements {
BlobRequirements::EmptyBlobs => VerifiedBlobs::EmptyBlobs,
BlobRequirements::NotRequired => VerifiedBlobs::NotRequired,
BlobRequirements::PreDeneb => VerifiedBlobs::PreDeneb,
BlobRequirements::Required => return Err(AvailabilityCheckError::MissingBlobs),
};
Ok(AvailableBlock { block, blobs })
}
BlockWrapper::BlockAndBlobs(_, _) => Err(AvailabilityCheckError::Pending),
}
}
/// Verifies a block against a set of KZG verified blobs. Returns an AvailableBlock if block's /// Verifies a block against a set of KZG verified blobs. Returns an AvailableBlock if block's
/// commitments are consistent with the provided verified blob commitments. /// commitments are consistent with the provided verified blob commitments.
pub fn check_availability_with_blobs( pub fn check_availability_with_blobs(

View File

@@ -11,7 +11,7 @@ use ssz_derive::{Decode, Encode};
use ssz_types::FixedVector; use ssz_types::FixedVector;
use std::{collections::HashSet, sync::Arc}; use std::{collections::HashSet, sync::Arc};
use types::blob_sidecar::BlobIdentifier; use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, Epoch, EthSpec, Hash256}; use types::{BlobSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock};
/// Caches partially available blobs and execution verified blocks corresponding /// Caches partially available blobs and execution verified blocks corresponding
/// to a given `block_hash` that are received over gossip. /// to a given `block_hash` that are received over gossip.
@@ -25,10 +25,12 @@ pub struct PendingComponents<T: EthSpec> {
} }
impl<T: EthSpec> PendingComponents<T> { impl<T: EthSpec> PendingComponents<T> {
pub fn new_from_blob(blob: KzgVerifiedBlob<T>) -> Self { pub fn new_from_blobs(blobs: &[KzgVerifiedBlob<T>]) -> Self {
let mut verified_blobs = FixedVector::<_, _>::default(); let mut verified_blobs = FixedVector::<_, _>::default();
if let Some(mut_maybe_blob) = verified_blobs.get_mut(blob.blob_index() as usize) { for blob in blobs {
*mut_maybe_blob = Some(blob); if let Some(mut_maybe_blob) = verified_blobs.get_mut(blob.blob_index() as usize) {
*mut_maybe_blob = Some(blob.clone());
}
} }
Self { Self {
@@ -240,6 +242,12 @@ impl<T: BeaconChainTypes> Critical<T> {
Ok(()) Ok(())
} }
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.in_memory
.peek(block_root)
.map_or(false, |cache| cache.executed_block.is_some())
}
/// This only checks for the blobs in memory /// This only checks for the blobs in memory
pub fn peek_blob( pub fn peek_blob(
&self, &self,
@@ -320,6 +328,34 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
}) })
} }
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.critical.read().has_block(block_root)
}
pub fn get_missing_blob_ids_checking_cache(
&self,
block_root: Hash256,
) -> (Option<Arc<SignedBeaconBlock<T::EthSpec>>>, HashSet<usize>) {
self.critical
.read()
.in_memory
.peek(&block_root)
.map(|cache| {
let block_opt = cache
.executed_block
.as_ref()
.map(|block| block.block.block.clone());
let blobs = cache
.verified_blobs
.iter()
.enumerate()
.filter_map(|(i, maybe_blob)| maybe_blob.as_ref().map(|_| i))
.collect::<HashSet<_>>();
(block_opt, blobs)
})
.unwrap_or_default()
}
pub fn peek_blob( pub fn peek_blob(
&self, &self,
blob_id: &BlobIdentifier, blob_id: &BlobIdentifier,
@@ -335,27 +371,39 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
} }
} }
pub fn put_kzg_verified_blob( pub fn put_kzg_verified_blobs(
&self, &self,
kzg_verified_blob: KzgVerifiedBlob<T::EthSpec>, block_root: Hash256,
kzg_verified_blobs: &[KzgVerifiedBlob<T::EthSpec>],
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> { ) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
for blob in kzg_verified_blobs {
let blob_block_root = blob.block_root();
if blob_block_root != block_root {
return Err(AvailabilityCheckError::BlockBlobRootMismatch {
block_root,
blob_block_root,
});
}
}
let mut write_lock = self.critical.write(); let mut write_lock = self.critical.write();
let block_root = kzg_verified_blob.block_root();
let availability = if let Some(mut pending_components) = let availability = if let Some(mut pending_components) =
write_lock.pop_pending_components(block_root, &self.overflow_store)? write_lock.pop_pending_components(block_root, &self.overflow_store)?
{ {
let blob_index = kzg_verified_blob.blob_index(); for kzg_verified_blob in kzg_verified_blobs {
*pending_components let blob_index = kzg_verified_blob.blob_index() as usize;
.verified_blobs if let Some(maybe_verified_blob) =
.get_mut(blob_index as usize) pending_components.verified_blobs.get_mut(blob_index)
.ok_or(AvailabilityCheckError::BlobIndexInvalid(blob_index))? = {
Some(kzg_verified_blob); *maybe_verified_blob = Some(kzg_verified_blob.clone())
} else {
return Err(AvailabilityCheckError::BlobIndexInvalid(blob_index as u64));
}
}
if let Some(executed_block) = pending_components.executed_block.take() { if let Some(executed_block) = pending_components.executed_block.take() {
self.check_block_availability_maybe_cache( self.check_block_availability_maybe_cache(
write_lock, write_lock,
block_root,
pending_components, pending_components,
executed_block, executed_block,
)? )?
@@ -365,17 +413,17 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
pending_components, pending_components,
&self.overflow_store, &self.overflow_store,
)?; )?;
Availability::PendingBlock(block_root) Availability::MissingComponents(block_root)
} }
} else { } else {
// not in memory or store -> put new in memory // not in memory or store -> put new in memory
let new_pending_components = PendingComponents::new_from_blob(kzg_verified_blob); let new_pending_components = PendingComponents::new_from_blobs(kzg_verified_blobs);
write_lock.put_pending_components( write_lock.put_pending_components(
block_root, block_root,
new_pending_components, new_pending_components,
&self.overflow_store, &self.overflow_store,
)?; )?;
Availability::PendingBlock(block_root) Availability::MissingComponents(block_root)
}; };
Ok(availability) Ok(availability)
@@ -394,7 +442,6 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
match write_lock.pop_pending_components(block_root, &self.overflow_store)? { match write_lock.pop_pending_components(block_root, &self.overflow_store)? {
Some(pending_components) => self.check_block_availability_maybe_cache( Some(pending_components) => self.check_block_availability_maybe_cache(
write_lock, write_lock,
block_root,
pending_components, pending_components,
executed_block, executed_block,
)?, )?,
@@ -422,7 +469,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
new_pending_components, new_pending_components,
&self.overflow_store, &self.overflow_store,
)?; )?;
Availability::PendingBlobs(all_blob_ids) Availability::MissingComponents(block_root)
} }
}; };
@@ -435,11 +482,10 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
/// Returns an error if there was an error when matching the block commitments against blob commitments. /// Returns an error if there was an error when matching the block commitments against blob commitments.
/// ///
/// Returns `Ok(Availability::Available(_))` if all blobs for the block are present in cache. /// Returns `Ok(Availability::Available(_))` if all blobs for the block are present in cache.
/// Returns `Ok(Availability::PendingBlobs(_))` if all corresponding blobs have not been received in the cache. /// Returns `Ok(Availability::MissingComponents(_))` if all corresponding blobs have not been received in the cache.
fn check_block_availability_maybe_cache( fn check_block_availability_maybe_cache(
&self, &self,
mut write_lock: RwLockWriteGuard<Critical<T>>, mut write_lock: RwLockWriteGuard<Critical<T>>,
block_root: Hash256,
mut pending_components: PendingComponents<T::EthSpec>, mut pending_components: PendingComponents<T::EthSpec>,
executed_block: AvailabilityPendingExecutedBlock<T::EthSpec>, executed_block: AvailabilityPendingExecutedBlock<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> { ) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
@@ -466,14 +512,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
), ),
))) )))
} else { } else {
let missing_blob_ids = executed_block.get_filtered_blob_ids(|index| { let block_root = executed_block.import_data.block_root;
pending_components
.verified_blobs
.get(index as usize)
.map(|maybe_blob| maybe_blob.is_none())
.unwrap_or(true)
});
let _ = pending_components.executed_block.insert(executed_block); let _ = pending_components.executed_block.insert(executed_block);
write_lock.put_pending_components( write_lock.put_pending_components(
block_root, block_root,
@@ -481,7 +520,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
&self.overflow_store, &self.overflow_store,
)?; )?;
Ok(Availability::PendingBlobs(missing_blob_ids)) Ok(Availability::MissingComponents(block_root))
} }
} }
@@ -1080,7 +1119,7 @@ mod test {
); );
} else { } else {
assert!( assert!(
matches!(availability, Availability::PendingBlobs(_)), matches!(availability, Availability::MissingComponents(_)),
"should be pending blobs" "should be pending blobs"
); );
assert_eq!( assert_eq!(
@@ -1109,7 +1148,7 @@ mod test {
if blob_index == blobs_expected - 1 { if blob_index == blobs_expected - 1 {
assert!(matches!(availability, Availability::Available(_))); assert!(matches!(availability, Availability::Available(_)));
} else { } else {
assert!(matches!(availability, Availability::PendingBlobs(_))); assert!(matches!(availability, Availability::MissingComponents(_)));
assert_eq!(cache.critical.read().in_memory.len(), 1); assert_eq!(cache.critical.read().in_memory.len(), 1);
} }
} }
@@ -1134,7 +1173,7 @@ mod test {
.expect("should put blob"); .expect("should put blob");
assert_eq!( assert_eq!(
availability, availability,
Availability::PendingBlock(root), Availability::MissingComponents(root),
"should be pending block" "should be pending block"
); );
assert_eq!(cache.critical.read().in_memory.len(), 1); assert_eq!(cache.critical.read().in_memory.len(), 1);
@@ -1284,7 +1323,7 @@ mod test {
cache.critical.read().in_memory.peek(&roots[0]).is_some(), cache.critical.read().in_memory.peek(&roots[0]).is_some(),
"first block should be in memory" "first block should be in memory"
); );
assert!(matches!(availability, Availability::PendingBlobs(_))); assert!(matches!(availability, Availability::MissingComponents(_)));
} }
} }
assert_eq!( assert_eq!(
@@ -1374,14 +1413,14 @@ mod test {
.put_pending_executed_block(pending_block) .put_pending_executed_block(pending_block)
.expect("should put block"); .expect("should put block");
assert!( assert!(
matches!(availability, Availability::PendingBlobs(_)), matches!(availability, Availability::MissingComponents(_)),
"should have pending blobs" "should have pending blobs"
); );
let availability = cache let availability = cache
.put_kzg_verified_blob(kzg_verified_blob) .put_kzg_verified_blob(kzg_verified_blob)
.expect("should put blob"); .expect("should put blob");
assert!( assert!(
matches!(availability, Availability::PendingBlobs(_)), matches!(availability, Availability::MissingComponents(_)),
"availabilty should be pending blobs: {:?}", "availabilty should be pending blobs: {:?}",
availability availability
); );
@@ -1392,14 +1431,14 @@ mod test {
let root = pending_block.block.as_block().canonical_root(); let root = pending_block.block.as_block().canonical_root();
assert_eq!( assert_eq!(
availability, availability,
Availability::PendingBlock(root), Availability::MissingComponents(root),
"should be pending block" "should be pending block"
); );
let availability = cache let availability = cache
.put_pending_executed_block(pending_block) .put_pending_executed_block(pending_block)
.expect("should put block"); .expect("should put block");
assert!( assert!(
matches!(availability, Availability::PendingBlobs(_)), matches!(availability, Availability::MissingComponents(_)),
"should have pending blobs" "should have pending blobs"
); );
} }
@@ -1410,7 +1449,7 @@ mod test {
.put_pending_executed_block(pending_block) .put_pending_executed_block(pending_block)
.expect("should put block"); .expect("should put block");
assert!( assert!(
matches!(availability, Availability::PendingBlobs(_)), matches!(availability, Availability::MissingComponents(_)),
"should be pending blobs" "should be pending blobs"
); );
} }
@@ -1527,14 +1566,14 @@ mod test {
.put_pending_executed_block(pending_block) .put_pending_executed_block(pending_block)
.expect("should put block"); .expect("should put block");
assert!( assert!(
matches!(availability, Availability::PendingBlobs(_)), matches!(availability, Availability::MissingComponents(_)),
"should have pending blobs" "should have pending blobs"
); );
let availability = cache let availability = cache
.put_kzg_verified_blob(kzg_verified_blob) .put_kzg_verified_blob(kzg_verified_blob)
.expect("should put blob"); .expect("should put blob");
assert!( assert!(
matches!(availability, Availability::PendingBlobs(_)), matches!(availability, Availability::MissingComponents(_)),
"availabilty should be pending blobs: {:?}", "availabilty should be pending blobs: {:?}",
availability availability
); );
@@ -1545,14 +1584,14 @@ mod test {
let root = pending_block.block.as_block().canonical_root(); let root = pending_block.block.as_block().canonical_root();
assert_eq!( assert_eq!(
availability, availability,
Availability::PendingBlock(root), Availability::MissingComponents(root),
"should be pending block" "should be pending block"
); );
let availability = cache let availability = cache
.put_pending_executed_block(pending_block) .put_pending_executed_block(pending_block)
.expect("should put block"); .expect("should put block");
assert!( assert!(
matches!(availability, Availability::PendingBlobs(_)), matches!(availability, Availability::MissingComponents(_)),
"should have pending blobs" "should have pending blobs"
); );
} }
@@ -1564,7 +1603,7 @@ mod test {
.put_pending_executed_block(pending_block) .put_pending_executed_block(pending_block)
.expect("should put block"); .expect("should put block");
assert!( assert!(
matches!(availability, Availability::PendingBlobs(_)), matches!(availability, Availability::MissingComponents(_)),
"should be pending blobs" "should be pending blobs"
); );
} }
@@ -1637,7 +1676,7 @@ mod test {
if i == additional_blobs - 1 { if i == additional_blobs - 1 {
assert!(matches!(availability, Availability::Available(_))) assert!(matches!(availability, Availability::Available(_)))
} else { } else {
assert!(matches!(availability, Availability::PendingBlobs(_))); assert!(matches!(availability, Availability::MissingComponents(_)));
} }
} }
} }

View File

@@ -658,34 +658,34 @@ pub fn generate_random_blobs<T: EthSpec>(
let versioned_hash = commitment.calculate_versioned_hash(); let versioned_hash = commitment.calculate_versioned_hash();
let blob_transaction = BlobTransaction { let blob_transaction = BlobTransaction {
chain_id: Default::default(), chain_id: Default::default(),
nonce: 0, nonce: 0,
max_priority_fee_per_gas: Default::default(), max_priority_fee_per_gas: Default::default(),
max_fee_per_gas: Default::default(), max_fee_per_gas: Default::default(),
gas: 100000, gas: 100000,
to: None, to: None,
value: Default::default(), value: Default::default(),
data: Default::default(), data: Default::default(),
access_list: Default::default(), access_list: Default::default(),
max_fee_per_data_gas: Default::default(), max_fee_per_data_gas: Default::default(),
versioned_hashes: vec![versioned_hash].into(), versioned_hashes: vec![versioned_hash].into(),
}; };
let bad_signature = EcdsaSignature { let bad_signature = EcdsaSignature {
y_parity: false, y_parity: false,
r: Uint256::from(0), r: Uint256::from(0),
s: Uint256::from(0), s: Uint256::from(0),
}; };
let signed_blob_transaction = SignedBlobTransaction { let signed_blob_transaction = SignedBlobTransaction {
message: blob_transaction, message: blob_transaction,
signature: bad_signature, signature: bad_signature,
}; };
// calculate transaction bytes // calculate transaction bytes
let tx_bytes = [BLOB_TX_TYPE] let tx_bytes = [BLOB_TX_TYPE]
.into_iter() .into_iter()
.chain(signed_blob_transaction.as_ssz_bytes().into_iter()) .chain(signed_blob_transaction.as_ssz_bytes().into_iter())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let tx = Transaction::<T::MaxBytesPerTransaction>::from(tx_bytes); let tx = Transaction::<T::MaxBytesPerTransaction>::from(tx_bytes);
transactions.push(tx); transactions.push(tx);
bundle bundle

View File

@@ -1,5 +1,5 @@
use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::data_availability_checker::{DataAvailabilityChecker}; use beacon_chain::data_availability_checker::DataAvailabilityChecker;
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
use lighthouse_network::rpc::RPCError; use lighthouse_network::rpc::RPCError;
use lighthouse_network::{PeerAction, PeerId}; use lighthouse_network::{PeerAction, PeerId};
@@ -60,7 +60,7 @@ pub(crate) struct BlockLookups<T: BeaconChainTypes> {
SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>, SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,
)>, )>,
da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>, da_checker: Arc<DataAvailabilityChecker<T>>,
/// The logger for the import manager. /// The logger for the import manager.
log: Logger, log: Logger,
@@ -121,10 +121,7 @@ pub enum ShouldRemoveLookup {
} }
impl<T: BeaconChainTypes> BlockLookups<T> { impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn new( pub fn new(da_checker: Arc<DataAvailabilityChecker<T>>, log: Logger) -> Self {
da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>,
log: Logger,
) -> Self {
Self { Self {
parent_lookups: Default::default(), parent_lookups: Default::default(),
processing_parent_lookups: Default::default(), processing_parent_lookups: Default::default(),
@@ -540,7 +537,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if !outstanding_blobs_req { if !outstanding_blobs_req {
if let Ok(peer_id) = parent_lookup if let Ok(peer_id) = parent_lookup
.current_parent_request .current_parent_request
.downloading_peer(ResponseType::Blob) { .downloading_peer(ResponseType::Blob)
{
cx.report_peer( cx.report_peer(
peer_id.to_peer_id(), peer_id.to_peer_id(),
PeerAction::MidToleranceError, PeerAction::MidToleranceError,
@@ -622,9 +620,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
seen_timestamp: Duration, seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) { ) {
let mut parent_lookup = if let Some(pos) = self.parent_lookups.iter().position(|request| { let mut parent_lookup = if let Some(pos) = self
request.pending_blob_response(id) .parent_lookups
}) { .iter()
.position(|request| request.pending_blob_response(id))
{
self.parent_lookups.remove(pos) self.parent_lookups.remove(pos)
} else { } else {
if blob.is_some() { if blob.is_some() {
@@ -1055,7 +1055,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx, cx,
&self.log, &self.log,
) )
} else if let Some(block_id_ref) = block_id_ref { } else if let Some(block_id_ref) = block_id_ref {
// Try it again if possible. // Try it again if possible.
retry_request_after_failure( retry_request_after_failure(
block_id_ref, block_id_ref,
@@ -1488,8 +1488,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
} }
} }
fn handle_block_lookup_verify_error<T: BeaconChainTypes>( fn handle_block_lookup_verify_error<T: BeaconChainTypes>(
request_id_ref: &mut u32, request_id_ref: &mut u32,
request_ref: &mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>, request_ref: &mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,

View File

@@ -63,25 +63,6 @@ pub enum RequestError {
NoPeers, NoPeers,
} }
#[derive(Debug)]
pub enum LookupDownloadStatus<T: EthSpec> {
Process(BlockWrapper<T>),
SearchBlock(Hash256),
AvailabilityCheck(AvailabilityCheckError),
}
impl<T: EthSpec> From<Result<BlockWrapper<T>, AvailabilityCheckError>> for LookupDownloadStatus<T> {
fn from(value: Result<BlockWrapper<T>, AvailabilityCheckError>) -> Self {
match value {
Ok(wrapper) => LookupDownloadStatus::Process(wrapper),
Err(AvailabilityCheckError::MissingBlobs(block_root)) => {
LookupDownloadStatus::SearchBlock(block_root)
}
Err(e) => LookupDownloadStatus::AvailabilityCheck(e),
}
}
}
impl<T: BeaconChainTypes> ParentLookup<T> { impl<T: BeaconChainTypes> ParentLookup<T> {
pub fn contains_block(&self, block_root: &Hash256) -> bool { pub fn contains_block(&self, block_root: &Hash256) -> bool {
self.downloaded_blocks self.downloaded_blocks
@@ -93,7 +74,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
block_root: Hash256, block_root: Hash256,
parent_root: Hash256, parent_root: Hash256,
peer_id: PeerShouldHave, peer_id: PeerShouldHave,
da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>, da_checker: Arc<DataAvailabilityChecker<T>>,
) -> Self { ) -> Self {
let current_parent_request = let current_parent_request =
SingleBlockLookup::new(parent_root, Some(<_>::default()), peer_id, da_checker); SingleBlockLookup::new(parent_root, Some(<_>::default()), peer_id, da_checker);
@@ -189,10 +170,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
Some(UnknownParentComponents::default()); Some(UnknownParentComponents::default());
} }
pub fn add_current_request_block( pub fn add_current_request_block(&mut self, block: Arc<SignedBeaconBlock<T::EthSpec>>) {
&mut self,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
) {
// Cache the block. // Cache the block.
self.current_parent_request.add_unknown_parent_block(block); self.current_parent_request.add_unknown_parent_block(block);

View File

@@ -23,7 +23,7 @@ pub struct SingleBlockLookup<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> {
pub blob_download_queue: FixedBlobSidecarList<T::EthSpec>, pub blob_download_queue: FixedBlobSidecarList<T::EthSpec>,
pub block_request_state: SingleLookupRequestState<MAX_ATTEMPTS>, pub block_request_state: SingleLookupRequestState<MAX_ATTEMPTS>,
pub blob_request_state: SingleLookupRequestState<MAX_ATTEMPTS>, pub blob_request_state: SingleLookupRequestState<MAX_ATTEMPTS>,
pub da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>, pub da_checker: Arc<DataAvailabilityChecker<T>>,
/// Only necessary for requests triggered by an `UnkownParent` because any /// Only necessary for requests triggered by an `UnkownParent` because any
/// blocks or blobs without parents won't hit the data availability cache. /// blocks or blobs without parents won't hit the data availability cache.
pub unknown_parent_components: Option<UnknownParentComponents<T::EthSpec>>, pub unknown_parent_components: Option<UnknownParentComponents<T::EthSpec>>,
@@ -110,7 +110,7 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
requested_block_root: Hash256, requested_block_root: Hash256,
unknown_parent_components: Option<UnknownParentComponents<T::EthSpec>>, unknown_parent_components: Option<UnknownParentComponents<T::EthSpec>>,
peer_source: PeerShouldHave, peer_source: PeerShouldHave,
da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>, da_checker: Arc<DataAvailabilityChecker<T>>,
) -> Self { ) -> Self {
Self { Self {
requested_block_root, requested_block_root,