Builder flow for Deneb & Blobs (#4428)

* Add Deneb builder flow types with generics

* Update validator client `get_blinded_blocks` call to support Deneb

* `produceBlindedBlock` endpoint updates:
- Handle new Deneb BuilderBid response from builder endpoint (new BlindedBlobsBundle type)
- Build BlockContents response (containing kzg_commitments, proof and blinded_blob_sidecars)

* Appease Clippy lint

* Partial implementation of submit blinded block & blobs. Refactor existing `BlobSidecar` related types to support blinded blobs.

* Add associated types for BlockProposal

* Rename `AbstractSidecar` to `Sidecar`

* Remove blob cache as it's no longer necessary

* Remove unnecessary enum variant

* Clean up

* Hanlde unblinded blobs and publish full block contents

* Fix tests

* Add local EL blobs caching in blinded flow

* Remove BlockProposal and move associated Sidecar trait to AbstractExecPayload to simplify changes

* add blob roots associated type

* move raw blobs associated type to sidecar trait

* Fix todos and improve error handling

* Consolidate BlobsBundle from `execution_layer` into `consensus/types`

* Rename RawBlobs, Blobs, and BlobRoots

* Use `BlobRoots` type alias

* Update error message.

Co-authored-by: realbigsean <seananderson33@GMAIL.com>

* update builder bid type

# Conflicts:
#	consensus/types/src/builder_bid.rs

* Fix lint

* remove generic from builder bid

---------

Co-authored-by: realbigsean <seananderson33@gmail.com>
This commit is contained in:
Jimmy Chen
2023-08-10 23:32:49 +10:00
committed by GitHub
parent fddd4e4c87
commit 0b7a426946
32 changed files with 1027 additions and 499 deletions

View File

@@ -7,7 +7,6 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_cache::BlobCache;
use crate::blob_verification::{self, GossipBlobError, GossipVerifiedBlob};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::POS_PANDA_BANNER;
@@ -67,8 +66,10 @@ use crate::validator_monitor::{
HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS,
};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{kzg_utils, AvailabilityPendingExecutedBlock};
use crate::{metrics, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead};
use crate::{
kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore,
BeaconSnapshot, CachedHead,
};
use eth2::types::{EventKind, SseBlock, SseExtendedPayloadAttributes, SyncDuty};
use execution_layer::{
BlockProposalContents, BuilderParams, ChainHealth, ExecutionLayer, FailedCondition,
@@ -118,7 +119,7 @@ use task_executor::{ShutdownReason, TaskExecutor};
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::beacon_state::CloneConfig;
use types::blob_sidecar::{BlobSidecarList, FixedBlobSidecarList};
use types::blob_sidecar::{BlobItems, BlobSidecarList, FixedBlobSidecarList};
use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS;
use types::*;
@@ -473,12 +474,15 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub validator_monitor: RwLock<ValidatorMonitor<T::EthSpec>>,
/// The slot at which blocks are downloaded back to.
pub genesis_backfill_slot: Slot,
pub proposal_blob_cache: BlobCache<T::EthSpec>,
pub data_availability_checker: Arc<DataAvailabilityChecker<T>>,
pub kzg: Option<Arc<Kzg<<T::EthSpec as EthSpec>::Kzg>>>,
}
type BeaconBlockAndState<T, Payload> = (BeaconBlock<T, Payload>, BeaconState<T>);
type BeaconBlockAndState<T, Payload> = (
BeaconBlock<T, Payload>,
BeaconState<T>,
Option<SidecarList<T, <Payload as AbstractExecPayload<T>>::Sidecar>>,
);
impl FinalizationAndCanonicity {
pub fn is_finalized(self) -> bool {
@@ -4978,67 +4982,52 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let blobs_verification_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_BLOBS_VERIFICATION_TIMES);
if let (Some(blobs), Some(proofs)) = (blobs_opt, proofs_opt) {
let kzg = self
.kzg
.as_ref()
.ok_or(BlockProductionError::TrustedSetupNotInitialized)?;
let beacon_block_root = block.canonical_root();
let expected_kzg_commitments = block.body().blob_kzg_commitments().map_err(|_| {
BlockProductionError::InvalidBlockVariant(
"DENEB block does not contain kzg commitments".to_string(),
let maybe_sidecar_list = match (blobs_opt, proofs_opt) {
(Some(blobs_or_blobs_roots), Some(proofs)) => {
let expected_kzg_commitments =
block.body().blob_kzg_commitments().map_err(|_| {
BlockProductionError::InvalidBlockVariant(
"deneb block does not contain kzg commitments".to_string(),
)
})?;
if expected_kzg_commitments.len() != blobs_or_blobs_roots.len() {
return Err(BlockProductionError::MissingKzgCommitment(format!(
"Missing KZG commitment for slot {}. Expected {}, got: {}",
block.slot(),
blobs_or_blobs_roots.len(),
expected_kzg_commitments.len()
)));
}
let kzg_proofs = Vec::from(proofs);
if let Some(blobs) = blobs_or_blobs_roots.blobs() {
let kzg = self
.kzg
.as_ref()
.ok_or(BlockProductionError::TrustedSetupNotInitialized)?;
kzg_utils::validate_blobs::<T::EthSpec>(
kzg,
expected_kzg_commitments,
blobs,
&kzg_proofs,
)
.map_err(BlockProductionError::KzgError)?;
}
Some(
Sidecar::build_sidecar(
blobs_or_blobs_roots,
&block,
expected_kzg_commitments,
kzg_proofs,
)
.map_err(BlockProductionError::FailedToBuildBlobSidecars)?,
)
})?;
if expected_kzg_commitments.len() != blobs.len() {
return Err(BlockProductionError::MissingKzgCommitment(format!(
"Missing KZG commitment for slot {}. Expected {}, got: {}",
slot,
blobs.len(),
expected_kzg_commitments.len()
)));
}
let kzg_proofs = Vec::from(proofs);
kzg_utils::validate_blobs::<T::EthSpec>(
kzg.as_ref(),
expected_kzg_commitments,
&blobs,
&kzg_proofs,
)
.map_err(BlockProductionError::KzgError)?;
let blob_sidecars = BlobSidecarList::from(
blobs
.into_iter()
.enumerate()
.map(|(blob_index, blob)| {
let kzg_commitment = expected_kzg_commitments
.get(blob_index)
.expect("KZG commitment should exist for blob");
let kzg_proof = kzg_proofs
.get(blob_index)
.expect("KZG proof should exist for blob");
Ok(Arc::new(BlobSidecar {
block_root: beacon_block_root,
index: blob_index as u64,
slot,
block_parent_root: block.parent_root(),
proposer_index,
blob,
kzg_commitment: *kzg_commitment,
kzg_proof: *kzg_proof,
}))
})
.collect::<Result<Vec<_>, BlockProductionError>>()?,
);
self.proposal_blob_cache
.put(beacon_block_root, blob_sidecars);
}
_ => None,
};
drop(blobs_verification_timer);
@@ -5052,7 +5041,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"slot" => block.slot()
);
Ok((block, state))
Ok((block, state, maybe_sidecar_list))
}
/// This method must be called whenever an execution engine indicates that a payload is

View File

@@ -1,35 +0,0 @@
use lru::LruCache;
use parking_lot::Mutex;
use types::{BlobSidecarList, EthSpec, Hash256};
pub const DEFAULT_BLOB_CACHE_SIZE: usize = 10;
/// A cache blobs by beacon block root.
pub struct BlobCache<T: EthSpec> {
blobs: Mutex<LruCache<BlobCacheId, BlobSidecarList<T>>>,
}
#[derive(Hash, PartialEq, Eq)]
struct BlobCacheId(Hash256);
impl<T: EthSpec> Default for BlobCache<T> {
fn default() -> Self {
BlobCache {
blobs: Mutex::new(LruCache::new(DEFAULT_BLOB_CACHE_SIZE)),
}
}
}
impl<T: EthSpec> BlobCache<T> {
pub fn put(
&self,
beacon_block: Hash256,
blobs: BlobSidecarList<T>,
) -> Option<BlobSidecarList<T>> {
self.blobs.lock().put(BlobCacheId(beacon_block), blobs)
}
pub fn pop(&self, root: &Hash256) -> Option<BlobSidecarList<T>> {
self.blobs.lock().pop(&BlobCacheId(*root))
}
}

View File

@@ -1,5 +1,4 @@
use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY};
use crate::blob_cache::BlobCache;
use crate::data_availability_checker::DataAvailabilityChecker;
use crate::eth1_chain::{CachingEth1Backend, SszEth1};
use crate::eth1_finalization_cache::Eth1FinalizationCache;
@@ -889,7 +888,6 @@ where
DataAvailabilityChecker::new(slot_clock, kzg.clone(), store, self.spec)
.map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?,
),
proposal_blob_cache: BlobCache::default(),
kzg,
};

View File

@@ -275,20 +275,22 @@ pub enum BlockProductionError {
blob_block_hash: ExecutionBlockHash,
payload_block_hash: ExecutionBlockHash,
},
NoBlobsCached,
FailedToReadFinalizedBlock(store::Error),
MissingFinalizedBlock(Hash256),
BlockTooLarge(usize),
ShuttingDown,
MissingBlobs,
MissingSyncAggregate,
MissingExecutionPayload,
MissingKzgCommitment(String),
MissingKzgProof(String),
TokioJoin(tokio::task::JoinError),
BeaconChain(BeaconChainError),
InvalidPayloadFork,
TrustedSetupNotInitialized,
InvalidBlockVariant(String),
KzgError(kzg::Error),
FailedToBuildBlobSidecars(String),
}
easy_from_to!(BlockProcessingError, BlockProductionError);

View File

@@ -7,7 +7,6 @@ mod beacon_chain;
mod beacon_fork_choice_store;
pub mod beacon_proposer_cache;
mod beacon_snapshot;
pub mod blob_cache;
pub mod blob_verification;
pub mod block_reward;
mod block_times_cache;

View File

@@ -15,7 +15,7 @@ use crate::{
StateSkipConfig,
};
use bls::get_withdrawal_credentials;
use eth2::types::BlockContentsTuple;
use eth2::types::SignedBlockContentsTuple;
use execution_layer::test_utils::generate_genesis_header;
use execution_layer::{
auth::JwtKey,
@@ -50,6 +50,7 @@ use state_processing::{
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::marker::PhantomData;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@@ -817,9 +818,28 @@ where
&self,
state: BeaconState<E>,
slot: Slot,
) -> (BlockContentsTuple<E, BlindedPayload<E>>, BeaconState<E>) {
) -> (
SignedBlockContentsTuple<E, BlindedPayload<E>>,
BeaconState<E>,
) {
let (unblinded, new_state) = self.make_block(state, slot).await;
((unblinded.0.into(), unblinded.1), new_state)
let maybe_blinded_blob_sidecars = unblinded.1.map(|blob_sidecar_list| {
VariableList::new(
blob_sidecar_list
.into_iter()
.map(|blob_sidecar| {
let blinded_sidecar: BlindedBlobSidecar = blob_sidecar.message.into();
SignedSidecar {
message: Arc::new(blinded_sidecar),
signature: blob_sidecar.signature,
_phantom: PhantomData,
}
})
.collect(),
)
.unwrap()
});
((unblinded.0.into(), maybe_blinded_blob_sidecars), new_state)
}
/// Returns a newly created block, signed by the proposer for the given slot.
@@ -827,7 +847,7 @@ where
&self,
mut state: BeaconState<E>,
slot: Slot,
) -> (BlockContentsTuple<E, FullPayload<E>>, BeaconState<E>) {
) -> (SignedBlockContentsTuple<E, FullPayload<E>>, BeaconState<E>) {
assert_ne!(slot, 0, "can't produce a block at slot 0");
assert!(slot >= state.slot());
@@ -845,7 +865,7 @@ where
let randao_reveal = self.sign_randao_reveal(&state, proposer_index, slot);
let (block, state) = self
let (block, state, maybe_blob_sidecars) = self
.chain
.produce_block_on_state(
state,
@@ -865,18 +885,14 @@ where
&self.spec,
);
let block_contents: BlockContentsTuple<E, FullPayload<E>> = match &signed_block {
let block_contents: SignedBlockContentsTuple<E, FullPayload<E>> = match &signed_block {
SignedBeaconBlock::Base(_)
| SignedBeaconBlock::Altair(_)
| SignedBeaconBlock::Merge(_)
| SignedBeaconBlock::Capella(_) => (signed_block, None),
SignedBeaconBlock::Deneb(_) => {
if let Some(blobs) = self
.chain
.proposal_blob_cache
.pop(&signed_block.canonical_root())
{
let signed_blobs: SignedBlobSidecarList<E> = Vec::from(blobs)
if let Some(blobs) = maybe_blob_sidecars {
let signed_blobs: SignedSidecarList<E, BlobSidecar<E>> = Vec::from(blobs)
.into_iter()
.map(|blob| {
blob.sign(
@@ -911,7 +927,7 @@ where
&self,
mut state: BeaconState<E>,
slot: Slot,
) -> (BlockContentsTuple<E, FullPayload<E>>, BeaconState<E>) {
) -> (SignedBlockContentsTuple<E, FullPayload<E>>, BeaconState<E>) {
assert_ne!(slot, 0, "can't produce a block at slot 0");
assert!(slot >= state.slot());
@@ -931,7 +947,7 @@ where
let pre_state = state.clone();
let (block, state) = self
let (block, state, maybe_blob_sidecars) = self
.chain
.produce_block_on_state(
state,
@@ -951,18 +967,14 @@ where
&self.spec,
);
let block_contents: BlockContentsTuple<E, FullPayload<E>> = match &signed_block {
let block_contents: SignedBlockContentsTuple<E, FullPayload<E>> = match &signed_block {
SignedBeaconBlock::Base(_)
| SignedBeaconBlock::Altair(_)
| SignedBeaconBlock::Merge(_)
| SignedBeaconBlock::Capella(_) => (signed_block, None),
SignedBeaconBlock::Deneb(_) => {
if let Some(blobs) = self
.chain
.proposal_blob_cache
.pop(&signed_block.canonical_root())
{
let signed_blobs: SignedBlobSidecarList<E> = Vec::from(blobs)
if let Some(blobs) = maybe_blob_sidecars {
let signed_blobs: SignedSidecarList<E, BlobSidecar<E>> = Vec::from(blobs)
.into_iter()
.map(|blob| {
blob.sign(
@@ -1778,7 +1790,7 @@ where
state: BeaconState<E>,
slot: Slot,
block_modifier: impl FnOnce(&mut BeaconBlock<E>),
) -> (BlockContentsTuple<E, FullPayload<E>>, BeaconState<E>) {
) -> (SignedBlockContentsTuple<E, FullPayload<E>>, BeaconState<E>) {
assert_ne!(slot, 0, "can't produce a block at slot 0");
assert!(slot >= state.slot());
@@ -1876,7 +1888,7 @@ where
&self,
slot: Slot,
block_root: Hash256,
block_contents: BlockContentsTuple<E, FullPayload<E>>,
block_contents: SignedBlockContentsTuple<E, FullPayload<E>>,
) -> Result<SignedBeaconBlockHash, BlockError<E>> {
self.set_current_slot(slot);
let (block, blobs) = block_contents;
@@ -1906,7 +1918,7 @@ where
pub async fn process_block_result(
&self,
block_contents: BlockContentsTuple<E, FullPayload<E>>,
block_contents: SignedBlockContentsTuple<E, FullPayload<E>>,
) -> Result<SignedBeaconBlockHash, BlockError<E>> {
let (block, blobs) = block_contents;
// Note: we are just dropping signatures here and skipping signature verification.
@@ -1991,7 +2003,7 @@ where
) -> Result<
(
SignedBeaconBlockHash,
BlockContentsTuple<E, FullPayload<E>>,
SignedBlockContentsTuple<E, FullPayload<E>>,
BeaconState<E>,
),
BlockError<E>,

View File

@@ -126,6 +126,7 @@ async fn get_chain_segment_with_signed_blobs() -> (
.get(&BlobSignatureKey::new(block_root, blob_index))
.unwrap()
.clone(),
_phantom: PhantomData,
}
})
.collect::<Vec<_>>();