More deneb cleanup (#4640)

* remove protoc and token from network tests github action

* delete unused beacon chain methods

* downgrade writing blobs to store log

* reduce diff in block import logic

* remove some todo's and deneb built in network

* remove unnecessary error, actually use some added metrics

* remove some metrics, fix missing components on publish funcitonality

* fix status tests

* rename sidecar by root to blobs by root

* clean up some metrics

* remove unnecessary feature gate from attestation subnet tests, clean up blobs by range response code

* pawan's suggestion in `protocol_info`, peer score in matching up batch sync block and blobs

* fix range tests for deneb

* pub block and blob db cache behind the same mutex

* remove unused errs and an empty file

* move sidecar trait to new file

* move types from payload to eth2 crate

* update comment and add flag value name

* make function private again, remove allow unused

* use reth rlp for tx decoding

* fix compile after merge

* rename kzg commitments

* cargo fmt

* remove unused dep

* Update beacon_node/execution_layer/src/lib.rs

Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com>

* Update beacon_node/beacon_processor/src/lib.rs

Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com>

* pawan's suggestiong for vec capacity

* cargo fmt

* Revert "use reth rlp for tx decoding"

This reverts commit 5181837d81.

* remove reth rlp

---------

Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com>
This commit is contained in:
realbigsean
2023-08-20 21:17:17 -04:00
committed by GitHub
parent 4898430330
commit 7d468cb487
66 changed files with 805 additions and 871 deletions

View File

@@ -118,10 +118,10 @@ use store::{
use task_executor::{ShutdownReason, TaskExecutor};
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::beacon_block_body::{from_block_kzg_commitments, to_block_kzg_commitments};
use types::beacon_block_body::from_block_kzg_commitments;
use types::beacon_state::CloneConfig;
use types::blob_sidecar::{BlobItems, BlobSidecarList, FixedBlobSidecarList};
use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS;
use types::blob_sidecar::{BlobSidecarList, FixedBlobSidecarList};
use types::sidecar::BlobItems;
use types::*;
pub type ForkChoiceError = fork_choice::Error<crate::ForkChoiceStoreError>;
@@ -186,7 +186,7 @@ pub enum WhenSlotSkipped {
Prev,
}
#[derive(Debug, PartialEq)]
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum AvailabilityProcessingStatus {
MissingComponents(Slot, Hash256),
Imported(Hash256),
@@ -1175,17 +1175,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Returns the blobs at the given root, if any.
///
/// Returns `Ok(None)` if the blobs and associated block are not found.
///
/// If we can find the corresponding block in our database, we know whether we *should* have
/// blobs. If we should have blobs and no blobs are found, this will error. If we shouldn't,
/// this will reconstruct an empty `BlobsSidecar`.
///
/// ## Errors
/// - any database read errors
/// - block and blobs are inconsistent in the database
/// - this method is called with a pre-deneb block root
/// - this method is called for a blob that is beyond the prune depth
/// May return a database error.
pub fn get_blobs(&self, block_root: &Hash256) -> Result<BlobSidecarList<T::EthSpec>, Error> {
match self.store.get_blobs(block_root)? {
Some(blobs) => Ok(blobs),
@@ -2017,7 +2008,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
blob_sidecar: SignedBlobSidecar<T::EthSpec>,
subnet_id: u64,
) -> Result<GossipVerifiedBlob<T>, GossipBlobError<T::EthSpec>> {
blob_verification::validate_blob_sidecar_for_gossip(blob_sidecar, subnet_id, self)
metrics::inc_counter(&metrics::BLOBS_SIDECAR_PROCESSING_REQUESTS);
let _timer = metrics::start_timer(&metrics::BLOBS_SIDECAR_GOSSIP_VERIFICATION_TIMES);
blob_verification::validate_blob_sidecar_for_gossip(blob_sidecar, subnet_id, self).map(
|v| {
metrics::inc_counter(&metrics::BLOBS_SIDECAR_PROCESSING_SUCCESSES);
v
},
)
}
/// Accepts some 'LightClientOptimisticUpdate' from the network and attempts to verify it
@@ -2798,9 +2796,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and
/// imported into the chain.
///
/// For post deneb blocks, this returns a `BlockError::AvailabilityPending` error
/// if the corresponding blobs are not in the required caches.
///
/// Items that implement `IntoExecutionPendingBlock` include:
///
/// - `SignedBeaconBlock`
@@ -2824,26 +2819,80 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Increment the Prometheus counter for block processing requests.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);
let block_slot = unverified_block.block().slot();
// A small closure to group the verification and import errors.
let chain = self.clone();
let import_block = async move {
let execution_pending = unverified_block.into_execution_pending_block(
block_root,
&chain,
notify_execution_layer,
)?;
publish_fn()?;
let executed_block = chain.into_executed_block(execution_pending).await?;
match executed_block {
ExecutedBlock::Available(block) => {
self.import_available_block(Box::new(block)).await
}
ExecutedBlock::AvailabilityPending(block) => {
self.check_block_availability_and_import(block).await
}
}
};
let execution_pending = unverified_block.into_execution_pending_block(
block_root,
&chain,
notify_execution_layer,
)?;
// Verify and import the block.
match import_block.await {
// The block was successfully verified and imported. Yay.
Ok(status @ AvailabilityProcessingStatus::Imported(block_root)) => {
trace!(
self.log,
"Beacon block imported";
"block_root" => ?block_root,
"block_slot" => block_slot,
);
publish_fn()?;
// Increment the Prometheus counter for block processing successes.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
let executed_block = self
.clone()
.into_executed_block(execution_pending)
.await
.map_err(|e| self.handle_block_error(e))?;
Ok(status)
}
Ok(status @ AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
trace!(
self.log,
"Beacon block awaiting blobs";
"block_root" => ?block_root,
"block_slot" => slot,
);
match executed_block {
ExecutedBlock::Available(block) => self.import_available_block(Box::new(block)).await,
ExecutedBlock::AvailabilityPending(block) => {
self.check_block_availability_and_import(block).await
Ok(status)
}
Err(e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_))) => {
debug!(
self.log,
"Beacon block processing cancelled";
"error" => ?e,
);
Err(e)
}
// There was an error whilst attempting to verify and import the block. The block might
// be partially verified or partially imported.
Err(BlockError::BeaconChainError(e)) => {
crit!(
self.log,
"Beacon block processing error";
"error" => ?e,
);
Err(BlockError::BeaconChainError(e))
}
// The block failed verification.
Err(other) => {
debug!(
self.log,
"Beacon block rejected";
"reason" => other.to_string(),
);
Err(other)
}
}
}
@@ -2903,35 +2952,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
))
}
fn handle_block_error(&self, e: BlockError<T::EthSpec>) -> BlockError<T::EthSpec> {
match e {
e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_)) => {
debug!(
self.log,
"Beacon block processing cancelled";
"error" => ?e,
);
e
}
BlockError::BeaconChainError(e) => {
crit!(
self.log,
"Beacon block processing error";
"error" => ?e,
);
BlockError::BeaconChainError(e)
}
other => {
trace!(
self.log,
"Beacon block rejected";
"reason" => other.to_string(),
);
other
}
}
}
/* Import methods */
/// Checks if the block is available, and imports immediately if so, otherwise caches the block
@@ -3017,11 +3037,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
consensus_context,
} = import_data;
let slot = block.slot();
// import
let chain = self.clone();
let result = self
let block_root = self
.spawn_blocking_handle(
move || {
chain.import_block(
@@ -3037,29 +3055,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
},
"payload_verification_handle",
)
.await
.map_err(|e| {
let b = BlockError::from(e);
self.handle_block_error(b)
})?;
match result {
// The block was successfully verified and imported. Yay.
Ok(block_root) => {
trace!(
self.log,
"Beacon block imported";
"block_root" => ?block_root,
"block_slot" => slot,
);
// Increment the Prometheus counter for block processing successes.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
Ok(AvailabilityProcessingStatus::Imported(block_root))
}
Err(e) => Err(self.handle_block_error(e)),
}
.await??;
Ok(AvailabilityProcessingStatus::Imported(block_root))
}
/// Accepts a fully-verified and available block and imports it into the chain without performing any
@@ -3248,7 +3245,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if let Some(blobs) = blobs {
if !blobs.is_empty() {
info!(
debug!(
self.log, "Writing blobs to store";
"block_root" => %block_root,
"count" => blobs.len(),
@@ -4111,10 +4108,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let proposal_epoch = proposal_slot.epoch(T::EthSpec::slots_per_epoch());
let head_block_root = cached_head.head_block_root();
let parent_block_root = cached_head.parent_block_root();
let parent_beacon_block_root = cached_head.parent_block_root();
// The proposer head must be equal to the canonical head or its parent.
if proposer_head != head_block_root && proposer_head != parent_block_root {
if proposer_head != head_block_root && proposer_head != parent_beacon_block_root {
warn!(
self.log,
"Unable to compute payload attributes";
@@ -4193,7 +4190,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Get the `prev_randao` and parent block number.
let head_block_number = cached_head.head_block_number()?;
let (prev_randao, parent_block_number) = if proposer_head == parent_block_root {
let (prev_randao, parent_block_number) = if proposer_head == parent_beacon_block_root {
(
cached_head.parent_random()?,
head_block_number.saturating_sub(1),
@@ -4206,7 +4203,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
proposer_index,
prev_randao,
parent_block_number,
parent_beacon_block_root: parent_block_root,
parent_beacon_block_root,
}))
}
@@ -4926,7 +4923,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(|_| BlockProductionError::InvalidPayloadFork)?,
bls_to_execution_changes: bls_to_execution_changes.into(),
blob_kzg_commitments: kzg_commitments
.map(to_block_kzg_commitments::<T::EthSpec>)
.ok_or(BlockProductionError::InvalidPayloadFork)?,
},
}),
@@ -6283,31 +6279,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// The epoch at which we require a data availability check in block processing.
/// `None` if the `Deneb` fork is disabled.
pub fn data_availability_boundary(&self) -> Option<Epoch> {
self.spec.deneb_fork_epoch.and_then(|fork_epoch| {
self.epoch().ok().map(|current_epoch| {
std::cmp::max(
fork_epoch,
current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
)
})
})
}
/// Returns true if the given epoch lies within the da boundary and false otherwise.
pub fn block_needs_da_check(&self, block_epoch: Epoch) -> bool {
self.data_availability_boundary()
.map_or(false, |da_epoch| block_epoch >= da_epoch)
}
/// Returns `true` if we are at or past the `Deneb` fork. This will always return `false` if
/// the `Deneb` fork is disabled.
pub fn is_data_availability_check_required(&self) -> Result<bool, Error> {
let current_epoch = self.epoch()?;
Ok(self
.spec
.deneb_fork_epoch
.map(|fork_epoch| fork_epoch <= current_epoch)
.unwrap_or(false))
self.data_availability_checker.data_availability_boundary()
}
}

View File

@@ -18,7 +18,7 @@ use std::borrow::Cow;
use types::blob_sidecar::BlobIdentifier;
use types::{
BeaconState, BeaconStateError, BlobSidecar, BlobSidecarList, ChainSpec, CloneConfig, EthSpec,
Hash256, KzgCommitment, RelativeEpoch, SignedBlobSidecar, Slot,
Hash256, RelativeEpoch, SignedBlobSidecar, Slot,
};
/// An error occurred while validating a gossip blob.
@@ -172,6 +172,9 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
pub fn slot(&self) -> Slot {
self.blob.message.slot
}
pub fn proposer_index(&self) -> u64 {
self.blob.message.proposer_index
}
}
pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
@@ -497,9 +500,6 @@ impl<T: EthSpec> KzgVerifiedBlob<T> {
pub fn clone_blob(&self) -> Arc<BlobSidecar<T>> {
self.blob.clone()
}
pub fn kzg_commitment(&self) -> KzgCommitment {
self.blob.kzg_commitment
}
pub fn block_root(&self) -> Hash256 {
self.blob.block_root
}

View File

@@ -162,6 +162,13 @@ impl<E: EthSpec> ExecutedBlock<E> {
Self::AvailabilityPending(pending) => &pending.block,
}
}
pub fn block_root(&self) -> Hash256 {
match self {
ExecutedBlock::AvailabilityPending(pending) => pending.import_data.block_root,
ExecutedBlock::Available(available) => available.import_data.block_root,
}
}
}
/// A block that has completed all pre-deneb block processing checks including verification

View File

@@ -883,7 +883,6 @@ where
slasher: self.slasher.clone(),
validator_monitor: RwLock::new(validator_monitor),
genesis_backfill_slot,
//TODO(sean) should we move kzg solely to the da checker?
data_availability_checker: Arc::new(
DataAvailabilityChecker::new(slot_clock, kzg.clone(), store, self.spec)
.map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?,

View File

@@ -24,9 +24,9 @@ use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlo
mod overflow_lru_cache;
/// The LRU Cache stores `PendingComponents` which can store up to
/// `MAX_BLOBS_PER_BLOCK = 4` blobs each. A `BlobSidecar` is 0.131256 MB. So
/// the maximum size of a `PendingComponents` is ~ 0.525024 MB. Setting this
/// to 1024 means the maximum size of the cache is ~ 0.5 GB. But the cache
/// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So
/// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this
/// to 1024 means the maximum size of the cache is ~ 0.8 GB. But the cache
/// will target a size of less than 75% of capacity.
pub const OVERFLOW_LRU_CAPACITY: usize = 1024;
@@ -79,11 +79,10 @@ impl From<ssz::DecodeError> for AvailabilityCheckError {
}
}
/// This cache contains
/// - blobs that have been gossip verified
/// - commitments for blocks that have been gossip verified, but the commitments themselves
/// have not been verified against blobs
/// - blocks that have been fully verified and only require a data availability check
/// This includes a cache for any blocks or blobs that have been received over gossip or RPC
/// and are awaiting more components before they can be imported. Additionally the
/// `DataAvailabilityChecker` is responsible for KZG verification of block components as well as
/// checking whether a "availability check" is required at all.
pub struct DataAvailabilityChecker<T: BeaconChainTypes> {
availability_cache: Arc<OverflowLRUCache<T>>,
slot_clock: T::SlotClock,
@@ -112,18 +111,6 @@ impl<T: EthSpec> Debug for Availability<T> {
}
}
impl<T: EthSpec> Availability<T> {
/// Returns all the blob identifiers associated with an `AvailableBlock`.
/// Returns `None` if avaiability hasn't been fully satisfied yet.
pub fn get_available_blob_ids(&self) -> Option<Vec<BlobIdentifier>> {
if let Self::Available(block) = self {
Some(block.get_all_blob_ids())
} else {
None
}
}
}
impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn new(
slot_clock: T::SlotClock,
@@ -140,10 +127,13 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}
/// Checks if the given block root is cached.
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.availability_cache.has_block(block_root)
}
/// Checks which blob ids are still required for a given block root, taking any cached
/// components into consideration.
pub fn get_missing_blob_ids_checking_cache(
&self,
block_root: Hash256,
@@ -164,7 +154,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
) -> Option<Vec<BlobIdentifier>> {
let epoch = self.slot_clock.now()?.epoch(T::EthSpec::slots_per_epoch());
self.da_check_required(epoch).then(|| {
self.da_check_required_for_epoch(epoch).then(|| {
block_opt
.map(|block| {
block.get_filtered_blob_ids(Some(block_root), |i, _| {
@@ -194,6 +184,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.peek_blob(blob_id)
}
/// Put a list of blobs received via RPC into the availability cache. This performs KZG
/// verification on the blobs in the list.
pub fn put_rpc_blobs(
&self,
block_root: Hash256,
@@ -232,8 +224,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.put_kzg_verified_blobs(kzg_verified_blob.block_root(), &[kzg_verified_blob])
}
/// Check if we have all the blobs for a block. If we do, return the Availability variant that
/// triggers import of the block.
/// Check if we have all the blobs for a block. Returns `Availability` which has information
/// about whether all components have been received or more are required.
pub fn put_pending_executed_block(
&self,
executed_block: AvailabilityPendingExecutedBlock<T::EthSpec>,
@@ -282,7 +274,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
/// Determines the blob requirements for a block. Answers the question: "Does this block require
/// blobs?".
fn blobs_required_for_block(&self, block: &SignedBeaconBlock<T::EthSpec>) -> bool {
let block_within_da_period = self.da_check_required(block.epoch());
let block_within_da_period = self.da_check_required_for_epoch(block.epoch());
let block_has_kzg_commitments = block
.message()
.body()
@@ -308,7 +300,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}
/// Returns true if the given epoch lies within the da boundary and false otherwise.
pub fn da_check_required(&self, block_epoch: Epoch) -> bool {
pub fn da_check_required_for_epoch(&self, block_epoch: Epoch) -> bool {
self.data_availability_boundary()
.map_or(false, |da_epoch| block_epoch >= da_epoch)
}

View File

@@ -270,11 +270,6 @@ pub enum BlockProductionError {
BlockingFailed(execution_layer::Error),
TerminalPoWBlockLookupFailed(execution_layer::Error),
GetPayloadFailed(execution_layer::Error),
GetBlobsFailed(execution_layer::Error),
BlobPayloadMismatch {
blob_block_hash: ExecutionBlockHash,
payload_block_hash: ExecutionBlockHash,
},
FailedToReadFinalizedBlock(store::Error),
MissingFinalizedBlock(Hash256),
BlockTooLarge(usize),
@@ -283,8 +278,7 @@ pub enum BlockProductionError {
MissingSyncAggregate,
MissingExecutionPayload,
MissingKzgCommitment(String),
MissingKzgProof(String),
TokioJoin(tokio::task::JoinError),
TokioJoin(JoinError),
BeaconChain(BeaconChainError),
InvalidPayloadFork,
TrustedSetupNotInitialized,

View File

@@ -89,10 +89,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(0);
}
let n_blobs_to_import = blocks_to_import
let n_blobs_lists_to_import = blocks_to_import
.iter()
.map(|available_block| available_block.blobs().map_or(0, |blobs| blobs.len()))
.sum::<usize>();
.filter(|available_block| available_block.blobs().is_some())
.count();
let mut expected_block_root = anchor_info.oldest_block_parent;
let mut prev_block_slot = anchor_info.oldest_block_slot;
@@ -100,7 +100,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
let mut cold_batch = Vec::with_capacity(blocks_to_import.len());
let mut hot_batch = Vec::with_capacity(blocks_to_import.len() + n_blobs_to_import);
let mut hot_batch = Vec::with_capacity(blocks_to_import.len() + n_blobs_lists_to_import);
let mut signed_blocks = Vec::with_capacity(blocks_to_import.len());
for available_block in blocks_to_import.into_iter().rev() {

View File

@@ -186,7 +186,7 @@ async fn state_advance_timer<T: BeaconChainTypes>(
head_slot,
}) => debug!(
log,
"Refused to advance head state. Chain may be syncing or lagging too far behind";
"Refused to advance head state";
"head_slot" => head_slot,
"current_slot" => current_slot,
),