From 73ab4d85d789cb2229b999ff81c73c199b901d3a Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 15 Mar 2023 17:21:18 -0400 Subject: [PATCH] cache cleanup --- beacon_node/beacon_chain/src/beacon_chain.rs | 4 +- .../beacon_chain/src/gossip_blob_cache.rs | 130 ++++++++---------- beacon_node/execution_layer/src/lib.rs | 5 +- consensus/types/src/beacon_block_body.rs | 2 +- consensus/types/src/blob_sidecar.rs | 11 +- consensus/types/src/lib.rs | 2 +- consensus/types/src/signed_beacon_block.rs | 2 +- 7 files changed, 75 insertions(+), 81 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d403cbc597..2b90fcb63a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6216,9 +6216,7 @@ impl BeaconChain { return Err(BlobError::TransactionCommitmentMismatch); } - self.blob_cache - - // Validatate that the kzg proof is valid against the commitments and blobs + // Validate that the kzg proof is valid against the commitments and blobs let kzg = self .kzg .as_ref() diff --git a/beacon_node/beacon_chain/src/gossip_blob_cache.rs b/beacon_node/beacon_chain/src/gossip_blob_cache.rs index 2904a2bb85..c76d122a78 100644 --- a/beacon_node/beacon_chain/src/gossip_blob_cache.rs +++ b/beacon_node/beacon_chain/src/gossip_blob_cache.rs @@ -1,45 +1,42 @@ use std::collections::{BTreeMap, HashMap, HashSet}; +use std::future::Future; use std::sync::Arc; use parking_lot::{Mutex, RwLock}; -use kzg::KzgCommitment; +use eth2::reqwest::header::Entry; +use kzg::{Kzg, KzgCommitment}; use ssz_types::VariableList; use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; use types::{EthSpec, Hash256}; use crate::blob_verification::{AvailabilityPendingBlock, verify_data_availability}; -use crate::block_verification::IntoExecutionPendingBlock; +use crate::block_verification::{ExecutedBlock, IntoExecutionPendingBlock}; +use crate::{BeaconChainError, BlockError}; +use crate::kzg_utils::validate_blob; +pub enum BlobCacheError { + DuplicateBlob(Hash256) +} /// This cache contains /// - blobs that have been gossip verified /// - commitments for blocks that have been gossip verified, but the commitments themselves /// have not been verified against blobs /// - blocks that have been fully verified and only require a data availability check pub struct GossipBlobCache { - blob_cache: Mutex> + rpc_blob_cache: RwLock>>>, + gossip_blob_cache: Mutex>>, + kzg: Kzg, } struct GossipBlobCacheInner { - // used when all blobs are not yet present and when the block is not yet present - - //TODO(sean) do we want two versions of this cache, one meant to serve RPC? - unverified_blobs: BTreeMap>>, - // used when the block was fully processed before we received all blobs - availability_pending_blocks: HashMap>, - // used to cache kzg commitments from gossip verified blocks in case we receive all blobs during block processing - unverified_commitments: HashMap>, - // used when block + blob kzg verification completes prior before block processing - verified_commitments: HashSet, + verified_blobs: Vec>>, + executed_block: Option>, } impl GossipBlobCache { - pub fn new() -> Self { - + pub fn new(kzg: Kzg) -> Self { Self { - blob_cache: Mutex::new(GossipBlobCacheInner { - unverified_blobs: BTreeMap::new(), - availability_pending_blocks: HashMap::new(), - unverified_commitments: HashMap::new(), - verified_commitments: HashSet::new(), - }) + rpc_blob_cache: RwLock::new(HashMap::new()), + gossip_blob_cache: Mutex::new(HashMap::new()), + kzg, } } @@ -49,59 +46,52 @@ impl GossipBlobCache { /// cached, verify the block and import it. /// /// This should only accept gossip verified blobs, so we should not have to worry about dupes. - pub fn put_blob(&self, blob: Arc>) { - let blob_id = blob.id(); - let blob_cache = self.blob_cache.lock(); + pub fn put_blob(&self, blob: Arc>) -> Result<(),BlobCacheError> { - if let Some(dup) = blob_cache.unverified_blobs.insert(blob_id, blob) { - // return error relating to gossip validation failure + // TODO(remove clones) + let verified = validate_blob(&self.kzg, blob.blob.clone(), blob.kzg_commitment.clone(), blob.kzg_proof)?; + + if verified { + let mut blob_cache = self.gossip_blob_cache.lock(); + + // Gossip cache. + blob_cache.entry(blob.block_root) + .and_modify(|mut inner| { + // All blobs reaching this cache should be gossip verified and gossip verification + // should filter duplicates, as well as validate indices. + inner.verified_blobs.insert(blob.index as usize, blob.clone()); + + if let Some (executed_block) = inner.executed_block.as_ref() { + // trigger reprocessing ? + } + }) + .or_insert(GossipBlobCacheInner { + verified_blobs: vec![blob.clone()], + executed_block: None + }); + + drop(blob_cache); + + // RPC cache. + self.rpc_blob_cache.write().insert(blob.id(), blob.clone()); } - if let Some(availability_pending_block) = blob_cache.availability_pending_blocks.get(&blob.block_root) { - let num_blobs = availability_pending_block.kzg_commitments().len(); - let mut blobs : Vec> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0) - ..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect(); - - if blobs.len() == num_blobs { - // verify - // import - } - } else if let Some(commitments) = blob_cache.unverified_commitments.get(&blob.block_root) { - let num_blobs = commitments.len(); - let mut blobs : Vec> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0) - ..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect(); - - if blobs.len() == num_blobs { - // verify - // cache - } - } + Ok(()) } - - pub fn put_commitments(&self, block_root: Hash256, kzg_commitments: VariableList) { - let blob_cache = self.blob_cache.lock(); - if let Some(dup) = blob_cache.unverified_commitments.insert(block_root, kzg_commitments) { - // return error relating to gossip validation failure - } - - let num_blobs = commitments.len(); - let mut blobs : Vec> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0) - ..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect(); - - if blobs.len() == num_blobs { - // verify - // cache - } - } - - pub fn check_availability_and_import(&self, block_root: Hash256, block: AvailabilityPendingBlock) -> bool { - let blob_cache = self.blob_cache.lock(); - if blob_cache.verified_commitments.contains(&block_root) { - true - } else { - // cache the block - false - } + pub fn put_block(&self, block: ExecutedBlock) -> () { + let mut guard = self.gossip_blob_cache.lock(); + guard.entry(block.block_root).and_modify(|cache|{ + if cache.verified_blobs == block.block.message_eip4844().blob_kzg_commitments() { + // send to reprocessing queue ? + } else if let Some(dup) = cache.executed_block.insert(block) { + // return error + } else { + // log that we cached it + } + }).or_insert(GossipBlobCacheInner { + verified_blobs: vec![], + executed_block: Some(block) + }); } } diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index adfdcd9f6f..a7a8bdfe26 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -44,12 +44,11 @@ use tree_hash::TreeHash; use types::consts::eip4844::BLOB_TX_TYPE; use types::transaction::{AccessTuple, BlobTransaction, EcdsaSignature, SignedBlobTransaction}; use types::Withdrawals; +use types::{AbstractExecPayload, BeaconStateError, ExecPayload, VersionedHash}; use types::{ - blobs_sidecar::{Blobs, KzgCommitments}, - BlindedPayload, BlockType, ChainSpec, Epoch, ExecutionBlockHash, ExecutionPayload, + BlindedPayload, Blobs, BlockType, ChainSpec, Epoch, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge, ForkName, }; -use types::{AbstractExecPayload, BeaconStateError, ExecPayload, VersionedHash}; use types::{ ProposerPreparationData, PublicKeyBytes, Signature, SignedBeaconBlock, Slot, Transaction, Uint256, diff --git a/consensus/types/src/beacon_block_body.rs b/consensus/types/src/beacon_block_body.rs index c717396522..ace5e0f081 100644 --- a/consensus/types/src/beacon_block_body.rs +++ b/consensus/types/src/beacon_block_body.rs @@ -1,5 +1,5 @@ +use crate::test_utils::TestRandom; use crate::*; -use crate::{blobs_sidecar::KzgCommitments, test_utils::TestRandom}; use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index 27523d588d..3484b5cdba 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -1,5 +1,6 @@ use crate::test_utils::TestRandom; use crate::{Blob, EthSpec, Hash256, SignedRoot, Slot}; +use bls::Signature; use derivative::Derivative; use kzg::{KzgCommitment, KzgProof}; use serde_derive::{Deserialize, Serialize}; @@ -34,7 +35,6 @@ pub struct BlobIdentifier { #[derivative(PartialEq, Hash(bound = "T: EthSpec"))] pub struct BlobSidecar { pub block_root: Hash256, - // TODO: fix the type, should fit in u8 as well #[serde(with = "eth2_serde_utils::quoted_u64")] pub index: u64, pub slot: Slot, @@ -52,6 +52,13 @@ pub type BlobSidecarList = VariableList, ::MaxBl impl SignedRoot for BlobSidecar {} impl BlobSidecar { + pub fn id(&self) -> BlobIdentifier { + BlobIdentifier { + block_root: self.block_root, + index: self.index, + } + } + pub fn empty() -> Self { Self::default() } @@ -61,4 +68,4 @@ impl BlobSidecar { // Fixed part Self::empty().as_ssz_bytes().len() } -} \ No newline at end of file +} diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index e0db5419bb..14f06bb51d 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -228,4 +228,4 @@ pub use bls::{ pub use kzg::{KzgCommitment, KzgProof}; pub use ssz_types::{typenum, typenum::Unsigned, BitList, BitVector, FixedVector, VariableList}; -pub use superstruct::superstruct; \ No newline at end of file +pub use superstruct::superstruct; diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index 00aad61ff4..ae59690bf2 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -581,4 +581,4 @@ mod test { assert_eq!(reconstructed, block); } } -} \ No newline at end of file +}