From 736a24e35a113362c55884832575f8032202f8f3 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 6 Mar 2023 12:04:31 -0500 Subject: [PATCH] add 'gossip blob cache' and start to clean up processing and transition types --- beacon_node/beacon_chain/src/beacon_chain.rs | 48 ++++- .../beacon_chain/src/blob_verification.rs | 167 ++++++------------ .../beacon_chain/src/block_verification.rs | 9 +- .../beacon_chain/src/gossip_blob_cache.rs | 111 ++++++++++++ beacon_node/beacon_chain/src/lib.rs | 1 + consensus/types/src/blob_sidecar.rs | 6 +- consensus/types/src/lib.rs | 3 + consensus/types/src/signed_beacon_block.rs | 8 - 8 files changed, 225 insertions(+), 128 deletions(-) create mode 100644 beacon_node/beacon_chain/src/gossip_blob_cache.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d677a08d1e..584774d72a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7,7 +7,7 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::blob_cache::BlobCache; -use crate::blob_verification::{AsBlock, AvailabilityPendingBlock, AvailableBlock, BlockWrapper, IntoAvailableBlock}; +use crate::blob_verification::{AsBlock, AvailabilityPendingBlock, AvailableBlock, BlobError, BlockWrapper, IntoAvailableBlock}; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::{ check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy, get_block_root, @@ -97,10 +97,13 @@ use std::borrow::Cow; use std::cmp::Ordering; use std::collections::HashMap; use std::collections::HashSet; +use std::future::Future; use std::io::prelude::*; use std::marker::PhantomData; use std::sync::Arc; use std::time::{Duration, Instant}; +use tokio::task::JoinHandle; +use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions; use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator}; use store::{ DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, @@ -432,6 +435,7 @@ pub struct BeaconChain { /// Provides monitoring of a set of explicitly defined validators. pub validator_monitor: RwLock>, pub blob_cache: BlobCache, + pub blob_cache: BlobCache, pub kzg: Option>, } @@ -6143,6 +6147,48 @@ impl BeaconChain { .map(|fork_epoch| fork_epoch <= current_epoch) .unwrap_or(false)) } + + pub async fn check_data_availability(&self, block: Arc>) -> Result, Error> { + let kzg_commitments = block + .message() + .body() + .blob_kzg_commitments() + .map_err(|_| BlobError::KzgCommitmentMissing)?; + let transactions = block + .message() + .body() + .execution_payload_eip4844() + .map(|payload| payload.transactions()) + .map_err(|_| BlobError::TransactionsMissing)? + .ok_or(BlobError::TransactionsMissing)?; + + if verify_kzg_commitments_against_transactions::(transactions, kzg_commitments) + .is_err() + { + return Err(BlobError::TransactionCommitmentMismatch); + } + + self.blob_cache + + // Validatate that the kzg proof is valid against the commitments and blobs + let kzg = self + .kzg + .as_ref() + .ok_or(BlobError::TrustedSetupNotInitialized)?; + + if !kzg_utils::validate_blobs_sidecar( + kzg, + block_slot, + block_root, + kzg_commitments, + blob_sidecar, + ) + .map_err(BlobError::KzgError)? + { + return Err(BlobError::InvalidKzgProof); + } + Ok(()) + } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index a5abad7453..4dce3cd670 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -2,18 +2,19 @@ use derivative::Derivative; use slot_clock::SlotClock; use std::sync::Arc; use tokio::task::JoinHandle; +use ssz_types::VariableList; use crate::beacon_chain::{BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; use crate::block_verification::PayloadVerificationOutcome; use crate::{kzg_utils, BeaconChainError, BlockError}; use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions; -use types::signed_beacon_block::BlobReconstructionError; use types::{ BeaconBlockRef, BeaconStateError, BlobsSidecar, EthSpec, Hash256, KzgCommitment, - SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockHeader, Slot, + SignedBeaconBlock, SignedBeaconBlockHeader, Slot, Transactions, }; use types::{Epoch, ExecPayload}; +use types::blob_sidecar::BlobSidecar; #[derive(Debug)] pub enum BlobError { @@ -66,15 +67,6 @@ pub enum BlobError { InconsistentFork, } -impl From for BlobError { - fn from(e: BlobReconstructionError) -> Self { - match e { - BlobReconstructionError::UnavailableBlobs => BlobError::UnavailableBlobs, - BlobReconstructionError::InconsistentFork => BlobError::InconsistentFork, - } - } -} - impl From for BlobError { fn from(e: BeaconChainError) -> Self { BlobError::BeaconChainError(e) @@ -117,7 +109,7 @@ pub fn validate_blob_for_gossip( block_wrapper.into_availablilty_pending_block(block_root, chain) } -fn verify_data_availability( +pub fn verify_data_availability( blob_sidecar: &BlobsSidecar, kzg_commitments: &[KzgCommitment], transactions: &Transactions, @@ -152,51 +144,6 @@ fn verify_data_availability( // Ok(()) } -/// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobsSidecar`]. This makes no -/// claims about data availability and should not be used in consensus. This struct is useful in -/// networking when we want to send blocks around without consensus checks. -#[derive(Clone, Debug, Derivative)] -#[derivative(PartialEq, Hash(bound = "E: EthSpec"))] -pub enum BlockWrapper { - Block(Arc>), - BlockAndBlobs(Arc>, Arc>), -} - -impl BlockWrapper { - pub fn new( - block: Arc>, - blobs_sidecar: Option>>, - ) -> Self { - if let Some(blobs_sidecar) = blobs_sidecar { - BlockWrapper::BlockAndBlobs(block, blobs_sidecar) - } else { - BlockWrapper::Block(block) - } - } -} - -impl From> for BlockWrapper { - fn from(block: SignedBeaconBlock) -> Self { - BlockWrapper::Block(Arc::new(block)) - } -} - -impl From> for BlockWrapper { - fn from(block: SignedBeaconBlockAndBlobsSidecar) -> Self { - let SignedBeaconBlockAndBlobsSidecar { - beacon_block, - blobs_sidecar, - } = block; - BlockWrapper::BlockAndBlobs(beacon_block, blobs_sidecar) - } -} - -impl From>> for BlockWrapper { - fn from(block: Arc>) -> Self { - BlockWrapper::Block(block) - } -} - #[derive(Copy, Clone)] pub enum DataAvailabilityCheckRequired { Yes, @@ -209,42 +156,12 @@ impl BlockWrapper { block_root: Hash256, chain: &BeaconChain, ) -> Result, BlobError> { - let data_availability_boundary = chain.data_availability_boundary(); - let da_check_required = - data_availability_boundary.map_or(DataAvailabilityCheckRequired::No, |boundary| { - if self.slot().epoch(T::EthSpec::slots_per_epoch()) >= boundary { - DataAvailabilityCheckRequired::Yes - } else { - DataAvailabilityCheckRequired::No - } - }); match self { BlockWrapper::Block(block) => { AvailabilityPendingBlock::new(block, block_root, da_check_required) } BlockWrapper::BlockAndBlobs(block, blobs_sidecar) => { - if matches!(da_check_required, DataAvailabilityCheckRequired::Yes) { - let kzg_commitments = block - .message() - .body() - .blob_kzg_commitments() - .map_err(|_| BlobError::KzgCommitmentMissing)?; - let transactions = block - .message() - .body() - .execution_payload_eip4844() - .map(|payload| payload.transactions()) - .map_err(|_| BlobError::TransactionsMissing)? - .ok_or(BlobError::TransactionsMissing)?; - verify_data_availability( - &blobs_sidecar, - kzg_commitments, - transactions, - block.slot(), - block_root, - chain, - )?; - } + AvailabilityPendingBlock::new_with_blobs(block, blobs_sidecar, da_check_required) } @@ -264,24 +181,24 @@ pub trait IntoAvailableBlock { /// `AvailableBlock` has passed any required data availability checks and should be used in /// consensus. #[derive(Clone, Debug, Derivative)] -#[derivative(PartialEq, Hash(bound = "E: EthSpec"))] -pub struct AvailabilityPendingBlock { - block: Arc>, - data_availability_handle: DataAvailabilityHandle, +#[derivative(PartialEq, Hash(bound = "T: BeaconChainTypes"))] +pub struct AvailabilityPendingBlock { + block: Arc>, + data_availability_handle: DataAvailabilityHandle, } /// Used to await the result of data availability check. -type DataAvailabilityHandle = JoinHandle>>, BlobError>>; +type DataAvailabilityHandle = JoinHandle>, BlobError>>; #[derive(Clone, Debug, Derivative)] -#[derivative(PartialEq, Hash(bound = "E: EthSpec"))] -pub struct AvailableBlock { - block: Arc>, - blobs: Blobs, +#[derivative(PartialEq, Hash(bound = "T: BeaconChainTypes"))] +pub struct AvailableBlock { + block: Arc>, + blobs: Blobs, } -impl AvailableBlock { - pub fn blobs(&self) -> Option>> { +impl AvailableBlock { + pub fn blobs(&self) -> Option>> { match &self.blobs { Blobs::NotRequired | Blobs::None => None, Blobs::Available(block_sidecar) => { @@ -290,7 +207,7 @@ impl AvailableBlock { } } - pub fn deconstruct(self) -> (Arc>, Option>>) { + pub fn deconstruct(self) -> (Arc>, Option>>) { match self.blobs { Blobs::NotRequired | Blobs::None => (self.block, None), Blobs::Available(blob_sidecars) => { @@ -302,7 +219,7 @@ impl AvailableBlock { pub enum Blobs { /// These blobs are available. - Available(Arc>), + Available(VariableList>, E::MaxBlobsPerBlock>), /// This block is from outside the data availability boundary or the block is from prior /// to the eip4844 fork. NotRequired, @@ -310,19 +227,33 @@ pub enum Blobs { None, } -impl AvailabilityPendingBlock { +//TODO(sean) add block root to the availability pending block? +impl AvailabilityPendingBlock { pub fn new( - beacon_block: Arc>, + beacon_block: Arc>, block_root: Hash256, - da_check_required: DataAvailabilityCheckRequired, + chain: &BeaconChain, ) -> Result { + let data_availability_boundary = chain.data_availability_boundary(); + let da_check_required = + data_availability_boundary.map_or(DataAvailabilityCheckRequired::No, |boundary| { + if chain.epoch()? >= boundary { + DataAvailabilityCheckRequired::Yes + } else { + DataAvailabilityCheckRequired::No + } + }); + match beacon_block.as_ref() { // No data availability check required prior to Eip4844. SignedBeaconBlock::Base(_) | SignedBeaconBlock::Altair(_) | SignedBeaconBlock::Capella(_) | SignedBeaconBlock::Merge(_) => { - Ok(AvailableBlock(AvailableBlockInner::Block(beacon_block))) + Ok(AvailabilityPendingBlock { + block: beacon_block , + data_availability_handle: async{ Ok(Some(Blobs::NotRequired))} + }) } SignedBeaconBlock::Eip4844(_) => { match da_check_required { @@ -339,7 +270,10 @@ impl AvailabilityPendingBlock { ))) } DataAvailabilityCheckRequired::No => { - Ok(AvailableBlock(AvailableBlockInner::Block(beacon_block))) + AvailabilityPendingBlock { + block: beacon_block, + data_availability_handle: async{ Ok(Some(Blobs::NotRequired))} + } } } } @@ -348,11 +282,21 @@ impl AvailabilityPendingBlock { /// This function is private because an `AvailableBlock` should be /// constructed via the `into_available_block` method. + //TODO(sean) do we want this to optionally cricumvent the beacon cache? fn new_with_blobs( - beacon_block: Arc>, - blobs_sidecar: Arc>, - da_check_required: DataAvailabilityCheckRequired, + beacon_block: Arc>, + blobs_sidecar: Arc>, + chain: &BeaconChain, ) -> Result { + let data_availability_boundary = chain.data_availability_boundary(); + let da_check_required = + data_availability_boundary.map_or(DataAvailabilityCheckRequired::No, |boundary| { + if chain.epoch()? >= boundary { + DataAvailabilityCheckRequired::Yes + } else { + DataAvailabilityCheckRequired::No + } + }); match beacon_block.as_ref() { // This method shouldn't be called with a pre-Eip4844 block. SignedBeaconBlock::Base(_) @@ -369,7 +313,10 @@ impl AvailabilityPendingBlock { DataAvailabilityCheckRequired::No => { // Blobs were not verified so we drop them, we'll instead just pass around // an available `Eip4844` block without blobs. - Ok(AvailableBlock(AvailableBlockInner::Block(beacon_block))) + Ok(AvailableBlock{ + block: beacon_block, + blobs: Blobs::NotRequired + }) } } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 449177254f..595acf1b4d 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -93,7 +93,6 @@ use std::time::Duration; use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp}; use task_executor::JoinHandle; use tree_hash::TreeHash; -use types::signed_beacon_block::BlobReconstructionError; use types::ExecPayload; use types::{ BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, CloneConfig, Epoch, @@ -491,12 +490,6 @@ impl From for BlockError { } } -impl From for BlockError { - fn from(e: BlobReconstructionError) -> Self { - BlockError::BlobValidation(BlobError::from(e)) - } -} - /// Stores information about verifying a payload against an execution engine. pub struct PayloadVerificationOutcome { pub payload_verification_status: PayloadVerificationStatus, @@ -634,7 +627,7 @@ pub fn signature_verify_chain_segment( #[derive(Derivative)] #[derivative(Debug(bound = "T: BeaconChainTypes"))] pub struct GossipVerifiedBlock { - pub block: AvailabilityPendingBlock, + pub block: AvailabilityPendingBlock, pub block_root: Hash256, parent: Option>, consensus_context: ConsensusContext, diff --git a/beacon_node/beacon_chain/src/gossip_blob_cache.rs b/beacon_node/beacon_chain/src/gossip_blob_cache.rs new file mode 100644 index 0000000000..9243234e77 --- /dev/null +++ b/beacon_node/beacon_chain/src/gossip_blob_cache.rs @@ -0,0 +1,111 @@ +use std::collections::{BTreeMap, HashMap}; +use kzg::KzgCommitment; +use ssz_types::VariableList; +use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; +use types::{EthSpec, Hash256}; +use crate::blob_verification::verify_data_availability; + +/// Only need to put when we get a blob +/// Only need to get when we have a block we want to verify +pub struct GossipBlobCache { + sender: tokio::sync::mpsc::Sender>, + thread: tokio::task::JoinHandle<()>, +} + +pub enum Operation { + DataAvailabilityCheck(DataAvailabilityRequest), + Put(BlobSidecar), +} + +pub struct DataAvailabilityRequest { + block_root: Hash256, + kzg_commitments: VariableList, + sender: oneshot_broadcast::Sender, T::MaxBlobsPerBlock>>, +} + +impl GossipBlobCache { + pub fn new() -> Self { + //TODO(sean) figure out capacity + + let (tx, mut rx) = tokio::sync::mpsc::channel::>(1000); + + + let thread = tokio::task::spawn(async move || { + let mut unverified_blobs: BTreeMap> = BTreeMap::new(); + let mut verified_blobs: HashMap, T::MaxBlobsPerBlock>>= HashMap::new(); + let mut requests: HashMap> = HashMap::new(); + while let Some(op) = rx.recv().await { + // check if we already have a verified set of blobs for this, if so ignore + // check if we can complete a set of blobs and verify + // -- if yes, do it, then check if there are outstanding requests we can resolve, and resolve them + // -- -- spawn a thread that does verification + // -- if no, add to unverified blobs + + match op { + Operation::Put(blob) => { + let blob_id = blob.id(); + if !unverified_blobs.contains_key(&blob_id) { + unverified_blobs.insert(blob_id, blob) + } + + if !verified_blobs.contains_key(&blob.block_root) { + // ignore + if let Some(request) = requests.get(&blob.block_root) { + let expected_blob_count = request.kzg_commitments.len(); + + let mut blobs = unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0)..BlobIdentifier::new(blob.block_root, expected_blob_count as u64)); + + for (index, (_, blob)) in blobs.enumerate() { + // find missing blobs and trigger a request + } + + verify_data_availability(blob, request.kzg_commitments); + verified_blobs.put(blob.block_root, blob); + + request.sender.send(result) + } + // check if the request can be completed, and if so complete it + } + } + Operation::DataAvailabilityCheck(request) => { + if let Some(verified_blobs) = verified_blobs.get(&blob.block_root) { + request.sender.send(result) + } else { + requests.insert(request.block_root, request) + } + } + Operation::GetBlobById(id) => { + unverified_blobs.get(id) + } + Operation::GetBlobsByBlockRoot((root, count)) => { + + } + } + + } + }); + Self { + sender: tx, + thread, + } + } + + pub fn put(&self, blob: BlobSidecar) { + self.sender.send(Operation::Put(blob)); + } + + pub async fn get_verified(&self, block_root: Hash256, kzg_commitments: VariableList) -> Receiever, T::MaxBlobsPerBlock>> { + // check if there are verified blobs + // if not, check if not check if there's a request for this block already. + // -- if yes, await the join handle return + // -- if no, create new request entry (spawn a new thread?) + let (tx, rx) = tokio::sync::oneshot::channel(); + let req = DataAvailabilityRequest { + block_root, + kzg_commitments, + sender: tx, + }; + self.sender.send(Operation::DataAvailabilityCheck(req)); + rx + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index fc86686bfa..74b641f352 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -51,6 +51,7 @@ pub mod test_utils; mod timeout_rw_lock; pub mod validator_monitor; pub mod validator_pubkey_cache; +pub mod gossip_blob_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index 169c570d29..d4a323f4a3 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -3,9 +3,9 @@ use crate::{Blob, EthSpec, Hash256, SignedRoot, Slot}; use derivative::Derivative; use kzg::{KzgCommitment, KzgProof}; use serde_derive::{Deserialize, Serialize}; +use bls::Signature; use ssz::Encode; use ssz_derive::{Decode, Encode}; -use ssz_types::VariableList; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; @@ -52,6 +52,10 @@ pub type BlobSidecarList = VariableList, ::MaxBl impl SignedRoot for BlobSidecar {} impl BlobSidecar { + pub fn id(&self) -> BlobIdentifier { + BlobIdentifier::new(self.block_root, self.index) + } + pub fn empty() -> Self { Self::default() } diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 14f06bb51d..20ef091695 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -25,6 +25,7 @@ pub mod beacon_block_body; pub mod beacon_block_header; pub mod beacon_committee; pub mod beacon_state; +pub mod blob_sidecar; pub mod bls_to_execution_change; pub mod builder_bid; pub mod chain_spec; @@ -219,6 +220,7 @@ pub type BLSFieldElement = Uint256; pub type Blob = FixedVector::BytesPerBlob>; pub type VersionedHash = Hash256; pub type Hash64 = ethereum_types::H64; +pub type BlobsSidecar = VariableList, ::MaxBlobsPerBlock>; pub use bls::{ AggregatePublicKey, AggregateSignature, Keypair, PublicKey, PublicKeyBytes, SecretKey, @@ -229,3 +231,4 @@ pub use kzg::{KzgCommitment, KzgProof}; pub use ssz_types::{typenum, typenum::Unsigned, BitList, BitVector, FixedVector, VariableList}; pub use superstruct::superstruct; +use crate::blob_sidecar::BlobSidecar; diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index ae59690bf2..17651775da 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -35,14 +35,6 @@ impl From for Hash256 { } } -#[derive(Debug)] -pub enum BlobReconstructionError { - /// No blobs for the specified block where we would expect blobs. - UnavailableBlobs, - /// Blobs provided for a pre-Eip4844 fork. - InconsistentFork, -} - /// A `BeaconBlock` and a signature from its proposer. #[superstruct( variants(Base, Altair, Merge, Capella, Eip4844),