cache cleanup

This commit is contained in:
realbigsean
2023-03-15 17:21:18 -04:00
parent 49862c7e48
commit 73ab4d85d7
7 changed files with 75 additions and 81 deletions

View File

@@ -6216,9 +6216,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlobError::TransactionCommitmentMismatch);
}
self.blob_cache
// Validatate that the kzg proof is valid against the commitments and blobs
// Validate that the kzg proof is valid against the commitments and blobs
let kzg = self
.kzg
.as_ref()

View File

@@ -1,45 +1,42 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::future::Future;
use std::sync::Arc;
use parking_lot::{Mutex, RwLock};
use kzg::KzgCommitment;
use eth2::reqwest::header::Entry;
use kzg::{Kzg, KzgCommitment};
use ssz_types::VariableList;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar};
use types::{EthSpec, Hash256};
use crate::blob_verification::{AvailabilityPendingBlock, verify_data_availability};
use crate::block_verification::IntoExecutionPendingBlock;
use crate::block_verification::{ExecutedBlock, IntoExecutionPendingBlock};
use crate::{BeaconChainError, BlockError};
use crate::kzg_utils::validate_blob;
pub enum BlobCacheError {
DuplicateBlob(Hash256)
}
/// This cache contains
/// - blobs that have been gossip verified
/// - commitments for blocks that have been gossip verified, but the commitments themselves
/// have not been verified against blobs
/// - blocks that have been fully verified and only require a data availability check
pub struct GossipBlobCache<T: EthSpec> {
blob_cache: Mutex<GossipBlobCacheInner<T>>
rpc_blob_cache: RwLock<HashMap<BlobIdentifier, Arc<BlobSidecar<T>>>>,
gossip_blob_cache: Mutex<HashMap<Hash256, GossipBlobCacheInner<T>>>,
kzg: Kzg,
}
struct GossipBlobCacheInner<T: EthSpec> {
// used when all blobs are not yet present and when the block is not yet present
//TODO(sean) do we want two versions of this cache, one meant to serve RPC?
unverified_blobs: BTreeMap<BlobIdentifier, Arc<BlobSidecar<T>>>,
// used when the block was fully processed before we received all blobs
availability_pending_blocks: HashMap<Hash256, AvailabilityPendingBlock<T>>,
// used to cache kzg commitments from gossip verified blocks in case we receive all blobs during block processing
unverified_commitments: HashMap<Hash256, VariableList<KzgCommitment, T::MaxBlobsPerBlock>>,
// used when block + blob kzg verification completes prior before block processing
verified_commitments: HashSet<Hash256>,
verified_blobs: Vec<Arc<BlobSidecar<T>>>,
executed_block: Option<ExecutedBlock<T>>,
}
impl <T: EthSpec> GossipBlobCache<T> {
pub fn new() -> Self {
pub fn new(kzg: Kzg) -> Self {
Self {
blob_cache: Mutex::new(GossipBlobCacheInner {
unverified_blobs: BTreeMap::new(),
availability_pending_blocks: HashMap::new(),
unverified_commitments: HashMap::new(),
verified_commitments: HashSet::new(),
})
rpc_blob_cache: RwLock::new(HashMap::new()),
gossip_blob_cache: Mutex::new(HashMap::new()),
kzg,
}
}
@@ -49,59 +46,52 @@ impl <T: EthSpec> GossipBlobCache<T> {
/// cached, verify the block and import it.
///
/// This should only accept gossip verified blobs, so we should not have to worry about dupes.
pub fn put_blob(&self, blob: Arc<BlobSidecar<T>>) {
let blob_id = blob.id();
let blob_cache = self.blob_cache.lock();
pub fn put_blob(&self, blob: Arc<BlobSidecar<T>>) -> Result<(),BlobCacheError> {
if let Some(dup) = blob_cache.unverified_blobs.insert(blob_id, blob) {
// return error relating to gossip validation failure
// TODO(remove clones)
let verified = validate_blob(&self.kzg, blob.blob.clone(), blob.kzg_commitment.clone(), blob.kzg_proof)?;
if verified {
let mut blob_cache = self.gossip_blob_cache.lock();
// Gossip cache.
blob_cache.entry(blob.block_root)
.and_modify(|mut inner| {
// All blobs reaching this cache should be gossip verified and gossip verification
// should filter duplicates, as well as validate indices.
inner.verified_blobs.insert(blob.index as usize, blob.clone());
if let Some (executed_block) = inner.executed_block.as_ref() {
// trigger reprocessing ?
}
})
.or_insert(GossipBlobCacheInner {
verified_blobs: vec![blob.clone()],
executed_block: None
});
drop(blob_cache);
// RPC cache.
self.rpc_blob_cache.write().insert(blob.id(), blob.clone());
}
if let Some(availability_pending_block) = blob_cache.availability_pending_blocks.get(&blob.block_root) {
let num_blobs = availability_pending_block.kzg_commitments().len();
let mut blobs : Vec<BlobIdentifier, BlobSidecar<T>> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0)
..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect();
if blobs.len() == num_blobs {
// verify
// import
}
} else if let Some(commitments) = blob_cache.unverified_commitments.get(&blob.block_root) {
let num_blobs = commitments.len();
let mut blobs : Vec<BlobIdentifier, BlobSidecar<T>> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0)
..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect();
if blobs.len() == num_blobs {
// verify
// cache
}
}
Ok(())
}
pub fn put_commitments(&self, block_root: Hash256, kzg_commitments: VariableList<KzgCommitment, T::MaxBlobsPerBlock>) {
let blob_cache = self.blob_cache.lock();
if let Some(dup) = blob_cache.unverified_commitments.insert(block_root, kzg_commitments) {
// return error relating to gossip validation failure
}
let num_blobs = commitments.len();
let mut blobs : Vec<BlobIdentifier, BlobSidecar<T>> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0)
..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect();
if blobs.len() == num_blobs {
// verify
// cache
}
}
pub fn check_availability_and_import(&self, block_root: Hash256, block: AvailabilityPendingBlock<T>) -> bool {
let blob_cache = self.blob_cache.lock();
if blob_cache.verified_commitments.contains(&block_root) {
true
} else {
// cache the block
false
}
pub fn put_block(&self, block: ExecutedBlock<T>) -> () {
let mut guard = self.gossip_blob_cache.lock();
guard.entry(block.block_root).and_modify(|cache|{
if cache.verified_blobs == block.block.message_eip4844().blob_kzg_commitments() {
// send to reprocessing queue ?
} else if let Some(dup) = cache.executed_block.insert(block) {
// return error
} else {
// log that we cached it
}
}).or_insert(GossipBlobCacheInner {
verified_blobs: vec![],
executed_block: Some(block)
});
}
}

View File

@@ -44,12 +44,11 @@ use tree_hash::TreeHash;
use types::consts::eip4844::BLOB_TX_TYPE;
use types::transaction::{AccessTuple, BlobTransaction, EcdsaSignature, SignedBlobTransaction};
use types::Withdrawals;
use types::{AbstractExecPayload, BeaconStateError, ExecPayload, VersionedHash};
use types::{
blobs_sidecar::{Blobs, KzgCommitments},
BlindedPayload, BlockType, ChainSpec, Epoch, ExecutionBlockHash, ExecutionPayload,
BlindedPayload, Blobs, BlockType, ChainSpec, Epoch, ExecutionBlockHash, ExecutionPayload,
ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge, ForkName,
};
use types::{AbstractExecPayload, BeaconStateError, ExecPayload, VersionedHash};
use types::{
ProposerPreparationData, PublicKeyBytes, Signature, SignedBeaconBlock, Slot, Transaction,
Uint256,