diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index b561b6ea1e..bb2d3c5b44 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -134,9 +134,9 @@ impl PayloadNotifier { /// contains a few extra checks by running `partially_verify_execution_payload` first: /// /// https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/bellatrix/beacon-chain.md#notify_new_payload -async fn notify_new_payload<'a, T: BeaconChainTypes>( +async fn notify_new_payload( chain: &Arc>, - block: BeaconBlockRef<'a, T::EthSpec>, + block: BeaconBlockRef, ) -> Result> { let execution_payload = block.execution_payload()?; diff --git a/beacon_node/beacon_chain/src/gossip_blob_cache.rs b/beacon_node/beacon_chain/src/gossip_blob_cache.rs index 9243234e77..2904a2bb85 100644 --- a/beacon_node/beacon_chain/src/gossip_blob_cache.rs +++ b/beacon_node/beacon_chain/src/gossip_blob_cache.rs @@ -1,111 +1,107 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::Arc; +use parking_lot::{Mutex, RwLock}; use kzg::KzgCommitment; use ssz_types::VariableList; use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; use types::{EthSpec, Hash256}; -use crate::blob_verification::verify_data_availability; +use crate::blob_verification::{AvailabilityPendingBlock, verify_data_availability}; +use crate::block_verification::IntoExecutionPendingBlock; -/// Only need to put when we get a blob -/// Only need to get when we have a block we want to verify +/// This cache contains +/// - blobs that have been gossip verified +/// - commitments for blocks that have been gossip verified, but the commitments themselves +/// have not been verified against blobs +/// - blocks that have been fully verified and only require a data availability check pub struct GossipBlobCache { - sender: tokio::sync::mpsc::Sender>, - thread: tokio::task::JoinHandle<()>, + blob_cache: Mutex> } -pub enum Operation { - DataAvailabilityCheck(DataAvailabilityRequest), - Put(BlobSidecar), -} +struct GossipBlobCacheInner { + // used when all blobs are not yet present and when the block is not yet present -pub struct DataAvailabilityRequest { - block_root: Hash256, - kzg_commitments: VariableList, - sender: oneshot_broadcast::Sender, T::MaxBlobsPerBlock>>, + //TODO(sean) do we want two versions of this cache, one meant to serve RPC? + unverified_blobs: BTreeMap>>, + // used when the block was fully processed before we received all blobs + availability_pending_blocks: HashMap>, + // used to cache kzg commitments from gossip verified blocks in case we receive all blobs during block processing + unverified_commitments: HashMap>, + // used when block + blob kzg verification completes prior before block processing + verified_commitments: HashSet, } impl GossipBlobCache { pub fn new() -> Self { - //TODO(sean) figure out capacity - let (tx, mut rx) = tokio::sync::mpsc::channel::>(1000); - - - let thread = tokio::task::spawn(async move || { - let mut unverified_blobs: BTreeMap> = BTreeMap::new(); - let mut verified_blobs: HashMap, T::MaxBlobsPerBlock>>= HashMap::new(); - let mut requests: HashMap> = HashMap::new(); - while let Some(op) = rx.recv().await { - // check if we already have a verified set of blobs for this, if so ignore - // check if we can complete a set of blobs and verify - // -- if yes, do it, then check if there are outstanding requests we can resolve, and resolve them - // -- -- spawn a thread that does verification - // -- if no, add to unverified blobs - - match op { - Operation::Put(blob) => { - let blob_id = blob.id(); - if !unverified_blobs.contains_key(&blob_id) { - unverified_blobs.insert(blob_id, blob) - } - - if !verified_blobs.contains_key(&blob.block_root) { - // ignore - if let Some(request) = requests.get(&blob.block_root) { - let expected_blob_count = request.kzg_commitments.len(); - - let mut blobs = unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0)..BlobIdentifier::new(blob.block_root, expected_blob_count as u64)); - - for (index, (_, blob)) in blobs.enumerate() { - // find missing blobs and trigger a request - } - - verify_data_availability(blob, request.kzg_commitments); - verified_blobs.put(blob.block_root, blob); - - request.sender.send(result) - } - // check if the request can be completed, and if so complete it - } - } - Operation::DataAvailabilityCheck(request) => { - if let Some(verified_blobs) = verified_blobs.get(&blob.block_root) { - request.sender.send(result) - } else { - requests.insert(request.block_root, request) - } - } - Operation::GetBlobById(id) => { - unverified_blobs.get(id) - } - Operation::GetBlobsByBlockRoot((root, count)) => { - - } - } - - } - }); Self { - sender: tx, - thread, + blob_cache: Mutex::new(GossipBlobCacheInner { + unverified_blobs: BTreeMap::new(), + availability_pending_blocks: HashMap::new(), + unverified_commitments: HashMap::new(), + verified_commitments: HashSet::new(), + }) + } + + } + + /// When we receive a blob check if we've cached it. If it completes a set and we have the + /// corresponding commitments, verify the commitments. If it completes a set and we have a block + /// cached, verify the block and import it. + /// + /// This should only accept gossip verified blobs, so we should not have to worry about dupes. + pub fn put_blob(&self, blob: Arc>) { + let blob_id = blob.id(); + let blob_cache = self.blob_cache.lock(); + + if let Some(dup) = blob_cache.unverified_blobs.insert(blob_id, blob) { + // return error relating to gossip validation failure + } + + if let Some(availability_pending_block) = blob_cache.availability_pending_blocks.get(&blob.block_root) { + let num_blobs = availability_pending_block.kzg_commitments().len(); + let mut blobs : Vec> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0) + ..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect(); + + if blobs.len() == num_blobs { + // verify + // import + } + } else if let Some(commitments) = blob_cache.unverified_commitments.get(&blob.block_root) { + let num_blobs = commitments.len(); + let mut blobs : Vec> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0) + ..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect(); + + if blobs.len() == num_blobs { + // verify + // cache + } } } - pub fn put(&self, blob: BlobSidecar) { - self.sender.send(Operation::Put(blob)); + + pub fn put_commitments(&self, block_root: Hash256, kzg_commitments: VariableList) { + let blob_cache = self.blob_cache.lock(); + if let Some(dup) = blob_cache.unverified_commitments.insert(block_root, kzg_commitments) { + // return error relating to gossip validation failure + } + + let num_blobs = commitments.len(); + let mut blobs : Vec> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0) + ..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect(); + + if blobs.len() == num_blobs { + // verify + // cache + } } - pub async fn get_verified(&self, block_root: Hash256, kzg_commitments: VariableList) -> Receiever, T::MaxBlobsPerBlock>> { - // check if there are verified blobs - // if not, check if not check if there's a request for this block already. - // -- if yes, await the join handle return - // -- if no, create new request entry (spawn a new thread?) - let (tx, rx) = tokio::sync::oneshot::channel(); - let req = DataAvailabilityRequest { - block_root, - kzg_commitments, - sender: tx, - }; - self.sender.send(Operation::DataAvailabilityCheck(req)); - rx + pub fn check_availability_and_import(&self, block_root: Hash256, block: AvailabilityPendingBlock) -> bool { + let blob_cache = self.blob_cache.lock(); + if blob_cache.verified_commitments.contains(&block_root) { + true + } else { + // cache the block + false + } } } diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index d4a323f4a3..c2d8e0aecd 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -1,9 +1,9 @@ use crate::test_utils::TestRandom; use crate::{Blob, EthSpec, Hash256, SignedRoot, Slot}; +use bls::Signature; use derivative::Derivative; use kzg::{KzgCommitment, KzgProof}; use serde_derive::{Deserialize, Serialize}; -use bls::Signature; use ssz::Encode; use ssz_derive::{Decode, Encode}; use test_random_derive::TestRandom; diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 20ef091695..5db46c3445 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -229,6 +229,6 @@ pub use bls::{ pub use kzg::{KzgCommitment, KzgProof}; +use crate::blob_sidecar::BlobSidecar; pub use ssz_types::{typenum, typenum::Unsigned, BitList, BitVector, FixedVector, VariableList}; pub use superstruct::superstruct; -use crate::blob_sidecar::BlobSidecar;