shard memory blob cache

This commit is contained in:
realbigsean
2023-03-09 07:38:18 -05:00
committed by Pawan Dhananjay
parent 736a24e35a
commit 13b54f7879
4 changed files with 89 additions and 93 deletions

View File

@@ -134,9 +134,9 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
/// contains a few extra checks by running `partially_verify_execution_payload` first:
///
/// https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/bellatrix/beacon-chain.md#notify_new_payload
async fn notify_new_payload<'a, T: BeaconChainTypes>(
async fn notify_new_payload<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
block: BeaconBlockRef<'a, T::EthSpec>,
block: BeaconBlockRef<T::EthSpec>,
) -> Result<PayloadVerificationStatus, BlockError<T::EthSpec>> {
let execution_payload = block.execution_payload()?;

View File

@@ -1,111 +1,107 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use parking_lot::{Mutex, RwLock};
use kzg::KzgCommitment;
use ssz_types::VariableList;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar};
use types::{EthSpec, Hash256};
use crate::blob_verification::verify_data_availability;
use crate::blob_verification::{AvailabilityPendingBlock, verify_data_availability};
use crate::block_verification::IntoExecutionPendingBlock;
/// Only need to put when we get a blob
/// Only need to get when we have a block we want to verify
/// This cache contains
/// - blobs that have been gossip verified
/// - commitments for blocks that have been gossip verified, but the commitments themselves
/// have not been verified against blobs
/// - blocks that have been fully verified and only require a data availability check
pub struct GossipBlobCache<T: EthSpec> {
sender: tokio::sync::mpsc::Sender<Operation<T>>,
thread: tokio::task::JoinHandle<()>,
blob_cache: Mutex<GossipBlobCacheInner<T>>
}
pub enum Operation<T: EthSpec> {
DataAvailabilityCheck(DataAvailabilityRequest<T>),
Put(BlobSidecar<T>),
}
struct GossipBlobCacheInner<T: EthSpec> {
// used when all blobs are not yet present and when the block is not yet present
pub struct DataAvailabilityRequest<T: EthSpec> {
block_root: Hash256,
kzg_commitments: VariableList<KzgCommitment, T::MaxBlobsPerBlock>,
sender: oneshot_broadcast::Sender<VariableList<BlobSidecar<T>, T::MaxBlobsPerBlock>>,
//TODO(sean) do we want two versions of this cache, one meant to serve RPC?
unverified_blobs: BTreeMap<BlobIdentifier, Arc<BlobSidecar<T>>>,
// used when the block was fully processed before we received all blobs
availability_pending_blocks: HashMap<Hash256, AvailabilityPendingBlock<T>>,
// used to cache kzg commitments from gossip verified blocks in case we receive all blobs during block processing
unverified_commitments: HashMap<Hash256, VariableList<KzgCommitment, T::MaxBlobsPerBlock>>,
// used when block + blob kzg verification completes prior before block processing
verified_commitments: HashSet<Hash256>,
}
impl <T: EthSpec> GossipBlobCache<T> {
pub fn new() -> Self {
//TODO(sean) figure out capacity
let (tx, mut rx) = tokio::sync::mpsc::channel::<Operation<T>>(1000);
let thread = tokio::task::spawn(async move || {
let mut unverified_blobs: BTreeMap<BlobIdentifier, BlobSidecar<T>> = BTreeMap::new();
let mut verified_blobs: HashMap<Hash256, VariableList<BlobSidecar<T>, T::MaxBlobsPerBlock>>= HashMap::new();
let mut requests: HashMap<Hash256, DataAvailabilityRequest<T>> = HashMap::new();
while let Some(op) = rx.recv().await {
// check if we already have a verified set of blobs for this, if so ignore
// check if we can complete a set of blobs and verify
// -- if yes, do it, then check if there are outstanding requests we can resolve, and resolve them
// -- -- spawn a thread that does verification
// -- if no, add to unverified blobs
match op {
Operation::Put(blob) => {
let blob_id = blob.id();
if !unverified_blobs.contains_key(&blob_id) {
unverified_blobs.insert(blob_id, blob)
}
if !verified_blobs.contains_key(&blob.block_root) {
// ignore
if let Some(request) = requests.get(&blob.block_root) {
let expected_blob_count = request.kzg_commitments.len();
let mut blobs = unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0)..BlobIdentifier::new(blob.block_root, expected_blob_count as u64));
for (index, (_, blob)) in blobs.enumerate() {
// find missing blobs and trigger a request
}
verify_data_availability(blob, request.kzg_commitments);
verified_blobs.put(blob.block_root, blob);
request.sender.send(result)
}
// check if the request can be completed, and if so complete it
}
}
Operation::DataAvailabilityCheck(request) => {
if let Some(verified_blobs) = verified_blobs.get(&blob.block_root) {
request.sender.send(result)
} else {
requests.insert(request.block_root, request)
}
}
Operation::GetBlobById(id) => {
unverified_blobs.get(id)
}
Operation::GetBlobsByBlockRoot((root, count)) => {
}
}
}
});
Self {
sender: tx,
thread,
blob_cache: Mutex::new(GossipBlobCacheInner {
unverified_blobs: BTreeMap::new(),
availability_pending_blocks: HashMap::new(),
unverified_commitments: HashMap::new(),
verified_commitments: HashSet::new(),
})
}
}
/// When we receive a blob check if we've cached it. If it completes a set and we have the
/// corresponding commitments, verify the commitments. If it completes a set and we have a block
/// cached, verify the block and import it.
///
/// This should only accept gossip verified blobs, so we should not have to worry about dupes.
pub fn put_blob(&self, blob: Arc<BlobSidecar<T>>) {
let blob_id = blob.id();
let blob_cache = self.blob_cache.lock();
if let Some(dup) = blob_cache.unverified_blobs.insert(blob_id, blob) {
// return error relating to gossip validation failure
}
if let Some(availability_pending_block) = blob_cache.availability_pending_blocks.get(&blob.block_root) {
let num_blobs = availability_pending_block.kzg_commitments().len();
let mut blobs : Vec<BlobIdentifier, BlobSidecar<T>> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0)
..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect();
if blobs.len() == num_blobs {
// verify
// import
}
} else if let Some(commitments) = blob_cache.unverified_commitments.get(&blob.block_root) {
let num_blobs = commitments.len();
let mut blobs : Vec<BlobIdentifier, BlobSidecar<T>> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0)
..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect();
if blobs.len() == num_blobs {
// verify
// cache
}
}
}
pub fn put(&self, blob: BlobSidecar<T>) {
self.sender.send(Operation::Put(blob));
pub fn put_commitments(&self, block_root: Hash256, kzg_commitments: VariableList<KzgCommitment, T::MaxBlobsPerBlock>) {
let blob_cache = self.blob_cache.lock();
if let Some(dup) = blob_cache.unverified_commitments.insert(block_root, kzg_commitments) {
// return error relating to gossip validation failure
}
let num_blobs = commitments.len();
let mut blobs : Vec<BlobIdentifier, BlobSidecar<T>> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0)
..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect();
if blobs.len() == num_blobs {
// verify
// cache
}
}
pub async fn get_verified(&self, block_root: Hash256, kzg_commitments: VariableList<KzgCommitment, T::MaxBlobsPerBlock>) -> Receiever<VariableList<BlobSidecar<T>, T::MaxBlobsPerBlock>> {
// check if there are verified blobs
// if not, check if not check if there's a request for this block already.
// -- if yes, await the join handle return
// -- if no, create new request entry (spawn a new thread?)
let (tx, rx) = tokio::sync::oneshot::channel();
let req = DataAvailabilityRequest {
block_root,
kzg_commitments,
sender: tx,
};
self.sender.send(Operation::DataAvailabilityCheck(req));
rx
pub fn check_availability_and_import(&self, block_root: Hash256, block: AvailabilityPendingBlock<T>) -> bool {
let blob_cache = self.blob_cache.lock();
if blob_cache.verified_commitments.contains(&block_root) {
true
} else {
// cache the block
false
}
}
}

View File

@@ -1,9 +1,9 @@
use crate::test_utils::TestRandom;
use crate::{Blob, EthSpec, Hash256, SignedRoot, Slot};
use bls::Signature;
use derivative::Derivative;
use kzg::{KzgCommitment, KzgProof};
use serde_derive::{Deserialize, Serialize};
use bls::Signature;
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use test_random_derive::TestRandom;

View File

@@ -229,6 +229,6 @@ pub use bls::{
pub use kzg::{KzgCommitment, KzgProof};
use crate::blob_sidecar::BlobSidecar;
pub use ssz_types::{typenum, typenum::Unsigned, BitList, BitVector, FixedVector, VariableList};
pub use superstruct::superstruct;
use crate::blob_sidecar::BlobSidecar;