mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-17 10:48:28 +00:00
Merge branch 'too-many-blob-branches' into partial-processing
This commit is contained in:
@@ -8,7 +8,8 @@ use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
|
||||
use crate::beacon_proposer_cache::BeaconProposerCache;
|
||||
use crate::blob_cache::BlobCache;
|
||||
use crate::blob_verification::{
|
||||
AsBlock, AvailableBlock, BlobError, Blobs, BlockWrapper, IntoAvailableBlock,
|
||||
self, AsBlock, AvailableBlock, BlobError, BlockWrapper, GossipVerifiedBlob, IntoAvailableBlock,
|
||||
VerifiedBlobs,
|
||||
};
|
||||
use crate::block_times_cache::BlockTimesCache;
|
||||
use crate::block_verification::{
|
||||
@@ -25,6 +26,7 @@ use crate::eth1_finalization_cache::{Eth1FinalizationCache, Eth1FinalizationData
|
||||
use crate::events::ServerSentEventHandler;
|
||||
use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle};
|
||||
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
|
||||
use crate::gossip_blob_cache::{Availability, AvailabilityCheckError, DataAvailabilityChecker};
|
||||
use crate::head_tracker::HeadTracker;
|
||||
use crate::historical_blocks::HistoricalBlockError;
|
||||
use crate::kzg_utils;
|
||||
@@ -76,6 +78,7 @@ use futures::channel::mpsc::Sender;
|
||||
use itertools::process_results;
|
||||
use itertools::Itertools;
|
||||
use kzg::Kzg;
|
||||
use oneshot_broadcast::Receiver;
|
||||
use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use proto_array::{CountUnrealizedFull, DoNotReOrg, ProposerHeadError};
|
||||
@@ -112,7 +115,9 @@ use store::{
|
||||
use task_executor::{ShutdownReason, TaskExecutor};
|
||||
use tokio::task::JoinHandle;
|
||||
use tree_hash::TreeHash;
|
||||
use types::beacon_block_body::KzgCommitments;
|
||||
use types::beacon_state::CloneConfig;
|
||||
use types::blob_sidecar::{BlobIdentifier, BlobSidecarArcList, Blobs};
|
||||
use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
|
||||
use types::consts::merge::INTERVALS_PER_SLOT;
|
||||
use types::*;
|
||||
@@ -185,6 +190,25 @@ pub enum WhenSlotSkipped {
|
||||
Prev,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum AvailabilityProcessingStatus {
|
||||
PendingBlobs(Vec<BlobIdentifier>),
|
||||
PendingBlock(Hash256),
|
||||
Imported(Hash256),
|
||||
}
|
||||
|
||||
//TODO(sean) using this in tests for now
|
||||
impl TryInto<SignedBeaconBlockHash> for AvailabilityProcessingStatus {
|
||||
type Error = ();
|
||||
|
||||
fn try_into(self) -> Result<SignedBeaconBlockHash, Self::Error> {
|
||||
match self {
|
||||
AvailabilityProcessingStatus::Imported(hash) => Ok(hash.into()),
|
||||
_ => Err(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The result of a chain segment processing.
|
||||
pub enum ChainSegmentResult<T: EthSpec> {
|
||||
/// Processing this chain segment finished successfully.
|
||||
@@ -274,11 +298,6 @@ pub enum StateSkipConfig {
|
||||
WithoutStateRoots,
|
||||
}
|
||||
|
||||
pub enum BlockProcessingResult<T: BeaconChainTypes> {
|
||||
Verified(Hash256),
|
||||
AvailabilityPending(ExecutedBlock<T>),
|
||||
}
|
||||
|
||||
pub trait BeaconChainTypes: Send + Sync + 'static {
|
||||
type HotStore: store::ItemStore<Self::EthSpec>;
|
||||
type ColdStore: store::ItemStore<Self::EthSpec>;
|
||||
@@ -440,7 +459,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
|
||||
pub slasher: Option<Arc<Slasher<T::EthSpec>>>,
|
||||
/// Provides monitoring of a set of explicitly defined validators.
|
||||
pub validator_monitor: RwLock<ValidatorMonitor<T::EthSpec>>,
|
||||
pub blob_cache: BlobCache<T::EthSpec>,
|
||||
pub proposal_blob_cache: BlobCache<T::EthSpec>,
|
||||
pub data_availability_checker: DataAvailabilityChecker<T::EthSpec>,
|
||||
pub kzg: Option<Arc<Kzg>>,
|
||||
}
|
||||
|
||||
@@ -971,35 +991,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
pub async fn get_block_and_blobs_checking_early_attester_cache(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
) -> Result<Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>, Error> {
|
||||
// If there is no data availability boundary, the Eip4844 fork is disabled.
|
||||
if let Some(finalized_data_availability_boundary) =
|
||||
self.finalized_data_availability_boundary()
|
||||
{
|
||||
// Only use the attester cache if we can find both the block and blob
|
||||
if let (Some(block), Some(blobs)) = (
|
||||
self.early_attester_cache.get_block(*block_root),
|
||||
self.early_attester_cache.get_blobs(*block_root),
|
||||
) {
|
||||
Ok(Some(SignedBeaconBlockAndBlobsSidecar {
|
||||
beacon_block: block,
|
||||
blobs_sidecar: blobs,
|
||||
}))
|
||||
// Attempt to get the block and blobs from the database
|
||||
} else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) {
|
||||
let blobs = self
|
||||
.get_blobs(block_root, finalized_data_availability_boundary)?
|
||||
.map(Arc::new);
|
||||
Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar {
|
||||
beacon_block: block,
|
||||
blobs_sidecar: blobs,
|
||||
}))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
) -> Result<Option<()>, Error> {
|
||||
//TODO(sean) use the rpc blobs cache and revert this to the current block cache logic
|
||||
Ok(Some(()))
|
||||
}
|
||||
|
||||
/// Returns the block at the given root, if any.
|
||||
@@ -1082,8 +1076,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
pub fn get_blobs(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
) -> Result<Option<BlobSidecarList<T::EthSpec>>, Error> {
|
||||
self.store.get_blobs(block_root)
|
||||
) -> Result<Option<BlobSidecarArcList<T::EthSpec>>, Error> {
|
||||
Ok(self.store.get_blobs(block_root)?)
|
||||
}
|
||||
|
||||
pub fn get_blinded_block(
|
||||
@@ -1905,6 +1899,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn verify_blob_sidecar_for_gossip(
|
||||
self: &Arc<Self>,
|
||||
blob_sidecar: SignedBlobSidecar<T::EthSpec>,
|
||||
subnet_id: u64,
|
||||
) -> Result<GossipVerifiedBlob<T::EthSpec>, BlobError> // TODO(pawan): make a GossipVerifedBlob type
|
||||
{
|
||||
blob_verification::validate_blob_sidecar_for_gossip(blob_sidecar, subnet_id, self)
|
||||
}
|
||||
|
||||
/// Accepts some 'LightClientOptimisticUpdate' from the network and attempts to verify it
|
||||
pub fn verify_optimistic_update_for_gossip(
|
||||
self: &Arc<Self>,
|
||||
@@ -2656,13 +2659,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.map_err(BeaconChainError::TokioJoin)?
|
||||
}
|
||||
|
||||
pub async fn process_blob(
|
||||
self: &Arc<Self>,
|
||||
blob: BlobSidecar<T::EthSpec>,
|
||||
count_unrealized: CountUnrealized,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
|
||||
self.check_availability_and_maybe_import(
|
||||
|chain| chain.data_availability_checker.put_blob(Arc::new(blob)),
|
||||
count_unrealized,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and
|
||||
/// imported into the chain.
|
||||
///
|
||||
/// For post deneb blocks, this returns a `BlockError::AvailabilityPending` error
|
||||
/// if the corresponding blobs are not in the required caches.
|
||||
///
|
||||
/// Items that implement `IntoExecutionPendingBlock` include:
|
||||
///
|
||||
/// - `SignedBeaconBlock`
|
||||
/// - `GossipVerifiedBlock`
|
||||
/// - `BlockWrapper`
|
||||
///
|
||||
/// ## Errors
|
||||
///
|
||||
@@ -2674,14 +2693,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
unverified_block: B,
|
||||
count_unrealized: CountUnrealized,
|
||||
notify_execution_layer: NotifyExecutionLayer,
|
||||
) -> Result<BlockProcessingResult<T>, BlockError<T::EthSpec>> {
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
|
||||
// Start the Prometheus timer.
|
||||
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
|
||||
|
||||
// Increment the Prometheus counter for block processing requests.
|
||||
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);
|
||||
|
||||
let slot = unverified_block.block().slot();
|
||||
let chain = self.clone();
|
||||
|
||||
let execution_pending = unverified_block.into_execution_pending_block(
|
||||
@@ -2692,65 +2710,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
// TODO(log required errors)
|
||||
let executed_block = self
|
||||
.clone()
|
||||
.into_executed_block(execution_pending, count_unrealized)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| self.handle_block_error(e))?;
|
||||
|
||||
// Check if the executed block has all it's blobs available to qualify as a fully
|
||||
// available block
|
||||
let import_block = if let Ok(blobs) = self
|
||||
.gossip_blob_cache
|
||||
.lock()
|
||||
.blobs(executed_block.block_root)
|
||||
{
|
||||
self.import_available_block(executed_block, blobs, count_unrealized)
|
||||
} else {
|
||||
return Ok(BlockProcessingResult::AvailabilityPending(executed_block));
|
||||
};
|
||||
|
||||
// Verify and import the block.
|
||||
match import_block.await {
|
||||
// The block was successfully verified and imported. Yay.
|
||||
Ok(block_root) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Beacon block imported";
|
||||
"block_root" => ?block_root,
|
||||
"block_slot" => slot,
|
||||
);
|
||||
|
||||
// Increment the Prometheus counter for block processing successes.
|
||||
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
|
||||
|
||||
Ok(BlockProcessingResult::Verified(block_root))
|
||||
}
|
||||
Err(e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_))) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Beacon block processing cancelled";
|
||||
"error" => ?e,
|
||||
);
|
||||
Err(e)
|
||||
}
|
||||
// There was an error whilst attempting to verify and import the block. The block might
|
||||
// be partially verified or partially imported.
|
||||
Err(BlockError::BeaconChainError(e)) => {
|
||||
crit!(
|
||||
self.log,
|
||||
"Beacon block processing error";
|
||||
"error" => ?e,
|
||||
);
|
||||
Err(BlockError::BeaconChainError(e))
|
||||
}
|
||||
// The block failed verification.
|
||||
Err(other) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Beacon block rejected";
|
||||
"reason" => other.to_string(),
|
||||
);
|
||||
Err(other)
|
||||
}
|
||||
}
|
||||
self.check_availability_and_maybe_import(
|
||||
|chain| {
|
||||
chain
|
||||
.data_availability_checker
|
||||
.check_block_availability(executed_block)
|
||||
},
|
||||
count_unrealized,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Accepts a fully-verified block and awaits on it's payload verification handle to
|
||||
@@ -2761,7 +2734,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
self: Arc<Self>,
|
||||
execution_pending_block: ExecutionPendingBlock<T>,
|
||||
count_unrealized: CountUnrealized,
|
||||
) -> Result<ExecutedBlock<T>, BlockError<T::EthSpec>> {
|
||||
) -> Result<ExecutedBlock<T::EthSpec>, BlockError<T::EthSpec>> {
|
||||
let ExecutionPendingBlock {
|
||||
block,
|
||||
block_root,
|
||||
@@ -2819,55 +2792,118 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
})
|
||||
}
|
||||
|
||||
fn handle_block_error(&self, e: BlockError<T::EthSpec>) -> BlockError<T::EthSpec> {
|
||||
match e {
|
||||
e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_)) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Beacon block processing cancelled";
|
||||
"error" => ?e,
|
||||
);
|
||||
e
|
||||
}
|
||||
BlockError::BeaconChainError(e) => {
|
||||
crit!(
|
||||
self.log,
|
||||
"Beacon block processing error";
|
||||
"error" => ?e,
|
||||
);
|
||||
BlockError::BeaconChainError(e)
|
||||
}
|
||||
other => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Beacon block rejected";
|
||||
"reason" => other.to_string(),
|
||||
);
|
||||
other
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Accepts a fully-verified, available block and imports it into the chain without performing any
|
||||
/// additional verification.
|
||||
///
|
||||
/// An error is returned if the block was unable to be imported. It may be partially imported
|
||||
/// (i.e., this function is not atomic).
|
||||
async fn import_available_block(
|
||||
self: Arc<Self>,
|
||||
executed_block: ExecutedBlock<T>,
|
||||
blobs: Blobs<T::EthSpec>,
|
||||
pub async fn check_availability_and_maybe_import(
|
||||
self: &Arc<Self>,
|
||||
cache_fn: impl FnOnce(Arc<Self>) -> Result<Availability<T::EthSpec>, AvailabilityCheckError>,
|
||||
count_unrealized: CountUnrealized,
|
||||
) -> Result<Hash256, BlockError<T::EthSpec>> {
|
||||
let ExecutedBlock {
|
||||
block,
|
||||
block_root,
|
||||
state,
|
||||
parent_block,
|
||||
confirmed_state_roots,
|
||||
payload_verification_outcome,
|
||||
parent_eth1_finalization_data,
|
||||
consensus_context,
|
||||
} = executed_block;
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
|
||||
let availability = cache_fn(self.clone())?;
|
||||
match availability {
|
||||
Availability::Available(block) => {
|
||||
let ExecutedBlock {
|
||||
block,
|
||||
block_root,
|
||||
state,
|
||||
parent_block,
|
||||
parent_eth1_finalization_data,
|
||||
confirmed_state_roots,
|
||||
consensus_context,
|
||||
payload_verification_outcome,
|
||||
} = block;
|
||||
|
||||
let chain = self.clone();
|
||||
let available_block = match block {
|
||||
BlockWrapper::Available(block) => block,
|
||||
BlockWrapper::AvailabilityPending(_) => {
|
||||
todo!() // logic error
|
||||
}
|
||||
};
|
||||
|
||||
let available_block = AvailableBlock {
|
||||
block: block.block_cloned(),
|
||||
blobs: blobs,
|
||||
};
|
||||
let slot = available_block.block.slot();
|
||||
|
||||
let block_hash = self
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
chain.import_block(
|
||||
available_block,
|
||||
block_root,
|
||||
state,
|
||||
confirmed_state_roots,
|
||||
payload_verification_outcome.payload_verification_status,
|
||||
count_unrealized,
|
||||
parent_block,
|
||||
parent_eth1_finalization_data,
|
||||
consensus_context,
|
||||
// import
|
||||
let chain = self.clone();
|
||||
let result = self
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
chain.import_block(
|
||||
available_block,
|
||||
block_root,
|
||||
state,
|
||||
confirmed_state_roots,
|
||||
payload_verification_outcome.payload_verification_status,
|
||||
count_unrealized,
|
||||
parent_block,
|
||||
parent_eth1_finalization_data,
|
||||
consensus_context,
|
||||
)
|
||||
},
|
||||
"payload_verification_handle",
|
||||
)
|
||||
},
|
||||
"payload_verification_handle",
|
||||
)
|
||||
.await??;
|
||||
.await
|
||||
.map_err(|e| {
|
||||
let b = BlockError::from(e);
|
||||
self.handle_block_error(b)
|
||||
})?;
|
||||
|
||||
Ok(block_hash)
|
||||
match result {
|
||||
// The block was successfully verified and imported. Yay.
|
||||
Ok(block_root) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Beacon block imported";
|
||||
"block_root" => ?block_root,
|
||||
"block_slot" => slot,
|
||||
);
|
||||
|
||||
// Increment the Prometheus counter for block processing successes.
|
||||
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
|
||||
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root))
|
||||
}
|
||||
Err(e) => Err(self.handle_block_error(e)),
|
||||
}
|
||||
}
|
||||
Availability::PendingBlock(block_root) => {
|
||||
Ok(AvailabilityProcessingStatus::PendingBlock(block_root))
|
||||
}
|
||||
Availability::PendingBlobs(blob_ids) => {
|
||||
Ok(AvailabilityProcessingStatus::PendingBlobs(blob_ids))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Accepts a fully-verified and available block and imports it into the chain without performing any
|
||||
@@ -4802,12 +4838,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.as_ref()
|
||||
.ok_or(BlockProductionError::TrustedSetupNotInitialized)?;
|
||||
let beacon_block_root = block.canonical_root();
|
||||
let expected_kzg_commitments: &KzgCommitments<T::EthSpec> =
|
||||
block.body().blob_kzg_commitments().map_err(|_| {
|
||||
BlockProductionError::InvalidBlockVariant(
|
||||
"EIP4844 block does not contain kzg commitments".to_string(),
|
||||
)
|
||||
})?;
|
||||
let expected_kzg_commitments = block.body().blob_kzg_commitments().map_err(|_| {
|
||||
BlockProductionError::InvalidBlockVariant(
|
||||
"EIP4844 block does not contain kzg commitments".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
if expected_kzg_commitments.len() != blobs.len() {
|
||||
return Err(BlockProductionError::MissingKzgCommitment(format!(
|
||||
@@ -4856,7 +4891,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.collect::<Result<Vec<BlobSidecar<T::EthSpec>>, BlockProductionError>>()?,
|
||||
);
|
||||
|
||||
self.blob_cache.put(beacon_block_root, blob_sidecars);
|
||||
self.proposal_blob_cache
|
||||
.put(beacon_block_root, blob_sidecars);
|
||||
}
|
||||
|
||||
metrics::inc_counter(&metrics::BLOCK_PRODUCTION_SUCCESSES);
|
||||
@@ -4874,8 +4910,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
fn compute_blob_kzg_proofs(
|
||||
kzg: &Arc<Kzg>,
|
||||
blobs: &Blobs<<T as BeaconChainTypes>::EthSpec>,
|
||||
expected_kzg_commitments: &KzgCommitments<<T as BeaconChainTypes>::EthSpec>,
|
||||
blobs: &Blobs<T::EthSpec>,
|
||||
expected_kzg_commitments: &KzgCommitments<T::EthSpec>,
|
||||
slot: Slot,
|
||||
) -> Result<Vec<KzgProof>, BlockProductionError> {
|
||||
blobs
|
||||
@@ -6173,49 +6209,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.map(|fork_epoch| fork_epoch <= current_epoch)
|
||||
.unwrap_or(false))
|
||||
}
|
||||
|
||||
pub async fn check_data_availability(
|
||||
&self,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
) -> Result<AvailableBlock<T>, Error> {
|
||||
let kzg_commitments = block
|
||||
.message()
|
||||
.body()
|
||||
.blob_kzg_commitments()
|
||||
.map_err(|_| BlobError::KzgCommitmentMissing)?;
|
||||
let transactions = block
|
||||
.message()
|
||||
.body()
|
||||
.execution_payload_eip4844()
|
||||
.map(|payload| payload.transactions())
|
||||
.map_err(|_| BlobError::TransactionsMissing)?
|
||||
.ok_or(BlobError::TransactionsMissing)?;
|
||||
|
||||
if verify_kzg_commitments_against_transactions::<T::EthSpec>(transactions, kzg_commitments)
|
||||
.is_err()
|
||||
{
|
||||
return Err(BlobError::TransactionCommitmentMismatch);
|
||||
}
|
||||
|
||||
// Validate that the kzg proof is valid against the commitments and blobs
|
||||
let kzg = self
|
||||
.kzg
|
||||
.as_ref()
|
||||
.ok_or(BlobError::TrustedSetupNotInitialized)?;
|
||||
|
||||
if !kzg_utils::validate_blobs_sidecar(
|
||||
kzg,
|
||||
block_slot,
|
||||
block_root,
|
||||
kzg_commitments,
|
||||
blob_sidecar,
|
||||
)
|
||||
.map_err(BlobError::KzgError)?
|
||||
{
|
||||
return Err(BlobError::InvalidKzgProof);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> Drop for BeaconChain<T> {
|
||||
@@ -6266,4 +6259,4 @@ impl<T: EthSpec> ChainSegmentResult<T> {
|
||||
ChainSegmentResult::Successful { .. } => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,21 @@
|
||||
use slot_clock::SlotClock;
|
||||
use state_processing::ConsensusContext;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::beacon_chain::{
|
||||
BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
|
||||
VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
|
||||
};
|
||||
use crate::gossip_blob_cache::AvailabilityCheckError;
|
||||
use crate::snapshot_cache::PreProcessingSnapshot;
|
||||
use crate::BeaconChainError;
|
||||
use derivative::Derivative;
|
||||
use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions;
|
||||
use types::blob_sidecar::BlobSidecarArcList;
|
||||
use types::{
|
||||
BeaconBlockRef, BeaconStateError, BlobSidecarList, Epoch, EthSpec, Hash256, KzgCommitment,
|
||||
SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, Transactions,
|
||||
BeaconBlockRef, BeaconStateError, BlobSidecar, BlobSidecarList, Epoch, EthSpec, Hash256,
|
||||
KzgCommitment, SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot,
|
||||
Transactions,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -107,6 +113,8 @@ pub enum BlobError {
|
||||
///
|
||||
/// The block is invalid and the peer is faulty.
|
||||
UnknownValidator(u64),
|
||||
|
||||
BlobCacheError(AvailabilityCheckError),
|
||||
}
|
||||
|
||||
impl From<BeaconChainError> for BlobError {
|
||||
@@ -121,14 +129,27 @@ impl From<BeaconStateError> for BlobError {
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on
|
||||
/// the p2p network.
|
||||
#[derive(Debug)]
|
||||
pub struct GossipVerifiedBlob<T: EthSpec> {
|
||||
blob: BlobSidecar<T>,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> GossipVerifiedBlob<T> {
|
||||
pub fn to_blob(self) -> BlobSidecar<T> {
|
||||
self.blob
|
||||
}
|
||||
}
|
||||
|
||||
pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
|
||||
blob_sidecar: SignedBlobSidecar<T::EthSpec>,
|
||||
signed_blob_sidecar: SignedBlobSidecar<T::EthSpec>,
|
||||
subnet: u64,
|
||||
chain: &BeaconChain<T>,
|
||||
) -> Result<(), BlobError> {
|
||||
let blob_slot = blob_sidecar.message.slot;
|
||||
let blob_index = blob_sidecar.message.index;
|
||||
let block_root = blob_sidecar.message.block_root;
|
||||
) -> Result<GossipVerifiedBlob<T::EthSpec>, BlobError> {
|
||||
let blob_slot = signed_blob_sidecar.message.slot;
|
||||
let blob_index = signed_blob_sidecar.message.index;
|
||||
let block_root = signed_blob_sidecar.message.block_root;
|
||||
|
||||
// Verify that the blob_sidecar was received on the correct subnet.
|
||||
if blob_index != subnet {
|
||||
@@ -167,7 +188,7 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
|
||||
|
||||
// TODO(pawan): should we verify locally that the parent root is correct
|
||||
// or just use whatever the proposer gives us?
|
||||
let proposer_shuffling_root = blob_sidecar.message.block_parent_root;
|
||||
let proposer_shuffling_root = signed_blob_sidecar.message.block_parent_root;
|
||||
|
||||
let (proposer_index, fork) = match chain
|
||||
.beacon_proposer_cache
|
||||
@@ -184,7 +205,7 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
|
||||
}
|
||||
};
|
||||
|
||||
let blob_proposer_index = blob_sidecar.message.proposer_index;
|
||||
let blob_proposer_index = signed_blob_sidecar.message.proposer_index;
|
||||
if proposer_index != blob_proposer_index as usize {
|
||||
return Err(BlobError::ProposerIndexMismatch {
|
||||
sidecar: blob_proposer_index as usize,
|
||||
@@ -203,7 +224,7 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
|
||||
.get(proposer_index as usize)
|
||||
.ok_or_else(|| BlobError::UnknownValidator(proposer_index as u64))?;
|
||||
|
||||
blob_sidecar.verify_signature(
|
||||
signed_blob_sidecar.verify_signature(
|
||||
None,
|
||||
pubkey,
|
||||
&fork,
|
||||
@@ -221,8 +242,6 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
|
||||
// TODO(pawan): Check if other blobs for the same proposer index and blob index have been
|
||||
// received and drop if required.
|
||||
|
||||
// TODO(pawan): potentially add to a seen cache at this point.
|
||||
|
||||
// Verify if the corresponding block for this blob has been received.
|
||||
// Note: this should be the last gossip check so that we can forward the blob
|
||||
// over the gossip network even if we haven't received the corresponding block yet
|
||||
@@ -233,13 +252,16 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
|
||||
.get_block(&block_root)
|
||||
.or_else(|| chain.early_attester_cache.get_proto_block(block_root)); // TODO(pawan): should we be checking this cache?
|
||||
|
||||
// TODO(pawan): this may be redundant with the new `AvailabilityProcessingStatus::PendingBlock variant`
|
||||
if block_opt.is_none() {
|
||||
return Err(BlobError::UnknownHeadBlock {
|
||||
beacon_block_root: block_root,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(GossipVerifiedBlob {
|
||||
blob: signed_blob_sidecar.message,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn verify_data_availability<T: BeaconChainTypes>(
|
||||
@@ -301,39 +323,45 @@ impl<T: BeaconChainTypes> IntoAvailableBlock<T> for BlockWrapper<T::EthSpec> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
#[derive(Clone, Debug, PartialEq, Derivative)]
|
||||
#[derivative(Hash(bound = "T: EthSpec"))]
|
||||
pub struct AvailableBlock<T: EthSpec> {
|
||||
pub block: Arc<SignedBeaconBlock<T>>,
|
||||
pub blobs: Blobs<T>,
|
||||
pub blobs: VerifiedBlobs<T>,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> AvailableBlock<T> {
|
||||
pub fn blobs(&self) -> Option<Arc<BlobSidecarList<T>>> {
|
||||
pub fn blobs(&self) -> Option<BlobSidecarArcList<T>> {
|
||||
match &self.blobs {
|
||||
Blobs::NotRequired | Blobs::None => None,
|
||||
Blobs::Available(blobs) => Some(blobs.clone()),
|
||||
VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreEip4844 => {
|
||||
None
|
||||
}
|
||||
VerifiedBlobs::Available(blobs) => Some(blobs.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deconstruct(self) -> (Arc<SignedBeaconBlock<T>>, Option<Arc<BlobSidecarList<T>>>) {
|
||||
pub fn deconstruct(self) -> (Arc<SignedBeaconBlock<T>>, Option<BlobSidecarArcList<T>>) {
|
||||
match self.blobs {
|
||||
Blobs::NotRequired | Blobs::None => (self.block, None),
|
||||
Blobs::Available(blobs) => (self.block, Some(blobs)),
|
||||
VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreEip4844 => {
|
||||
(self.block, None)
|
||||
}
|
||||
VerifiedBlobs::Available(blobs) => (self.block, Some(blobs)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum Blobs<E: EthSpec> {
|
||||
#[derive(Clone, Debug, PartialEq, Derivative)]
|
||||
#[derivative(Hash(bound = "E: EthSpec"))]
|
||||
pub enum VerifiedBlobs<E: EthSpec> {
|
||||
/// These blobs are available.
|
||||
Available(Arc<BlobSidecarList<E>>),
|
||||
Available(BlobSidecarArcList<E>),
|
||||
/// This block is from outside the data availability boundary so doesn't require
|
||||
/// a data availability check.
|
||||
NotRequired,
|
||||
/// The block's `kzg_commitments` field is empty so it does not contain any blobs.
|
||||
EmptyBlobs,
|
||||
/// This is a block prior to the 4844 fork, so doesn't require any blobs
|
||||
None,
|
||||
PreEip4844,
|
||||
}
|
||||
|
||||
pub trait AsBlock<E: EthSpec> {
|
||||
@@ -348,7 +376,8 @@ pub trait AsBlock<E: EthSpec> {
|
||||
fn canonical_root(&self) -> Hash256;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Derivative)]
|
||||
#[derivative(Hash(bound = "E: EthSpec"))]
|
||||
pub enum BlockWrapper<E: EthSpec> {
|
||||
/// This variant is fully available.
|
||||
/// i.e. for pre-4844 blocks, it contains a (`SignedBeaconBlock`, `Blobs::None`) and for
|
||||
|
||||
@@ -54,6 +54,7 @@ use crate::execution_payload::{
|
||||
is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block,
|
||||
AllowOptimisticImport, NotifyExecutionLayer, PayloadNotifier,
|
||||
};
|
||||
use crate::gossip_blob_cache::AvailabilityCheckError;
|
||||
use crate::snapshot_cache::PreProcessingSnapshot;
|
||||
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
|
||||
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
|
||||
@@ -64,6 +65,7 @@ use crate::{
|
||||
},
|
||||
metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
|
||||
};
|
||||
use derivative::Derivative;
|
||||
use eth2::types::EventKind;
|
||||
use execution_layer::PayloadStatus;
|
||||
use fork_choice::{AttestationFromBlock, PayloadVerificationStatus};
|
||||
@@ -306,6 +308,7 @@ pub enum BlockError<T: EthSpec> {
|
||||
parent_root: Hash256,
|
||||
},
|
||||
BlobValidation(BlobError),
|
||||
AvailabilityCheck(AvailabilityCheckError),
|
||||
}
|
||||
|
||||
impl<T: EthSpec> From<BlobError> for BlockError<T> {
|
||||
@@ -314,6 +317,12 @@ impl<T: EthSpec> From<BlobError> for BlockError<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: EthSpec> From<AvailabilityCheckError> for BlockError<T> {
|
||||
fn from(e: AvailabilityCheckError) -> Self {
|
||||
Self::AvailabilityCheck(e)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returned when block validation failed due to some issue verifying
|
||||
/// the execution payload.
|
||||
#[derive(Debug)]
|
||||
@@ -487,6 +496,7 @@ impl<T: EthSpec> From<ArithError> for BlockError<T> {
|
||||
}
|
||||
|
||||
/// Stores information about verifying a payload against an execution engine.
|
||||
#[derive(Clone)]
|
||||
pub struct PayloadVerificationOutcome {
|
||||
pub payload_verification_status: PayloadVerificationStatus,
|
||||
pub is_valid_merge_transition_block: bool,
|
||||
@@ -619,7 +629,8 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
|
||||
|
||||
/// A wrapper around a `SignedBeaconBlock` that indicates it has been approved for re-gossiping on
|
||||
/// the p2p network.
|
||||
#[derive(Debug)]
|
||||
#[derive(Derivative)]
|
||||
#[derivative(Debug(bound = "T: BeaconChainTypes"))]
|
||||
pub struct GossipVerifiedBlock<T: BeaconChainTypes> {
|
||||
pub block: BlockWrapper<T::EthSpec>,
|
||||
pub block_root: Hash256,
|
||||
@@ -663,17 +674,24 @@ pub struct ExecutionPendingBlock<T: BeaconChainTypes> {
|
||||
pub payload_verification_handle: PayloadVerificationHandle<T::EthSpec>,
|
||||
}
|
||||
|
||||
pub struct ExecutedBlock<T: BeaconChainTypes> {
|
||||
pub block: BlockWrapper<T::EthSpec>,
|
||||
#[derive(Clone)]
|
||||
pub struct ExecutedBlock<E: EthSpec> {
|
||||
pub block: BlockWrapper<E>,
|
||||
pub block_root: Hash256,
|
||||
pub state: BeaconState<T::EthSpec>,
|
||||
pub parent_block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>,
|
||||
pub state: BeaconState<E>,
|
||||
pub parent_block: SignedBeaconBlock<E, BlindedPayload<E>>,
|
||||
pub parent_eth1_finalization_data: Eth1FinalizationData,
|
||||
pub confirmed_state_roots: Vec<Hash256>,
|
||||
pub consensus_context: ConsensusContext<T::EthSpec>,
|
||||
pub consensus_context: ConsensusContext<E>,
|
||||
pub payload_verification_outcome: PayloadVerificationOutcome,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> std::fmt::Debug for ExecutedBlock<E> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self.block)
|
||||
}
|
||||
}
|
||||
|
||||
/// Implemented on types that can be converted into a `ExecutionPendingBlock`.
|
||||
///
|
||||
/// Used to allow functions to accept blocks at various stages of verification.
|
||||
@@ -1156,7 +1174,7 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for BlockWrapper<T::EthSp
|
||||
BlockWrapper::AvailabilityPending(block) => block
|
||||
.into_execution_pending_block_slashable(block_root, chain, notify_execution_layer),
|
||||
BlockWrapper::Available(AvailableBlock { block, blobs }) => {
|
||||
let execution_pending_block = block.into_execution_pending_block_slashable(
|
||||
let mut execution_pending_block = block.into_execution_pending_block_slashable(
|
||||
block_root,
|
||||
chain,
|
||||
notify_execution_layer,
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY};
|
||||
use crate::blob_cache::BlobCache;
|
||||
use crate::block_verification::ExecutedBlock;
|
||||
use crate::eth1_chain::{CachingEth1Backend, SszEth1};
|
||||
use crate::eth1_finalization_cache::Eth1FinalizationCache;
|
||||
use crate::fork_choice_signal::ForkChoiceSignalTx;
|
||||
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
|
||||
use crate::gossip_blob_cache::DataAvailabilityChecker;
|
||||
use crate::head_tracker::HeadTracker;
|
||||
use crate::migrate::{BackgroundMigrator, MigratorConfig};
|
||||
use crate::persisted_beacon_chain::PersistedBeaconChain;
|
||||
@@ -644,7 +646,8 @@ where
|
||||
let kzg = if let Some(trusted_setup) = self.trusted_setup {
|
||||
let kzg = Kzg::new_from_trusted_setup(trusted_setup)
|
||||
.map_err(|e| format!("Failed to load trusted setup: {:?}", e))?;
|
||||
Some(Arc::new(kzg))
|
||||
let kzg_arc = Arc::new(kzg);
|
||||
Some(kzg_arc)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -850,7 +853,9 @@ where
|
||||
graffiti: self.graffiti,
|
||||
slasher: self.slasher.clone(),
|
||||
validator_monitor: RwLock::new(validator_monitor),
|
||||
blob_cache: BlobCache::default(),
|
||||
//TODO(sean) should we move kzg solely to the da checker?
|
||||
data_availability_checker: DataAvailabilityChecker::new(kzg.clone()),
|
||||
proposal_blob_cache: BlobCache::default(),
|
||||
kzg,
|
||||
};
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::blob_verification::{AvailabilityPendingBlock, AvailableBlock};
|
||||
use crate::blob_verification::AvailableBlock;
|
||||
use crate::{
|
||||
attester_cache::{CommitteeLengths, Error},
|
||||
metrics,
|
||||
@@ -6,6 +6,7 @@ use crate::{
|
||||
use parking_lot::RwLock;
|
||||
use proto_array::Block as ProtoBlock;
|
||||
use std::sync::Arc;
|
||||
use types::blob_sidecar::BlobSidecarArcList;
|
||||
use types::*;
|
||||
|
||||
pub struct CacheItem<E: EthSpec> {
|
||||
@@ -21,7 +22,8 @@ pub struct CacheItem<E: EthSpec> {
|
||||
* Values used to make the block available.
|
||||
*/
|
||||
block: Arc<SignedBeaconBlock<E>>,
|
||||
blobs: Option<Arc<BlobsSidecar<E>>>,
|
||||
//TODO(sean) remove this and just use the da checker?'
|
||||
blobs: Option<BlobSidecarArcList<E>>,
|
||||
proto_block: ProtoBlock,
|
||||
}
|
||||
|
||||
@@ -160,7 +162,7 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
|
||||
}
|
||||
|
||||
/// Returns the blobs, if `block_root` matches the cached item.
|
||||
pub fn get_blobs(&self, block_root: Hash256) -> Option<Arc<BlobsSidecar<E>>> {
|
||||
pub fn get_blobs(&self, block_root: Hash256) -> Option<BlobSidecarArcList<E>> {
|
||||
self.item
|
||||
.read()
|
||||
.as_ref()
|
||||
|
||||
@@ -134,9 +134,9 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
|
||||
/// contains a few extra checks by running `partially_verify_execution_payload` first:
|
||||
///
|
||||
/// https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/bellatrix/beacon-chain.md#notify_new_payload
|
||||
async fn notify_new_payload<T: BeaconChainTypes>(
|
||||
async fn notify_new_payload<'a, T: BeaconChainTypes>(
|
||||
chain: &Arc<BeaconChain<T>>,
|
||||
block: BeaconBlockRef<T::EthSpec>,
|
||||
block: BeaconBlockRef<'a, T::EthSpec>,
|
||||
) -> Result<PayloadVerificationStatus, BlockError<T::EthSpec>> {
|
||||
let execution_payload = block.execution_payload()?;
|
||||
|
||||
|
||||
@@ -1,47 +1,63 @@
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use crate::blob_verification::{
|
||||
verify_data_availability, AsBlock, AvailableBlock, BlockWrapper, VerifiedBlobs,
|
||||
};
|
||||
use crate::block_verification::{ExecutedBlock, IntoExecutionPendingBlock};
|
||||
use crate::kzg_utils::validate_blob;
|
||||
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, BlockError};
|
||||
use kzg::Error as KzgError;
|
||||
use kzg::{Kzg, KzgCommitment};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use kzg::KzgCommitment;
|
||||
use ssz_types::VariableList;
|
||||
use ssz_types::{Error, VariableList};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::future::Future;
|
||||
use std::sync::{mpsc, Arc};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use types::blob_sidecar::{BlobIdentifier, BlobSidecar};
|
||||
use types::{EthSpec, Hash256};
|
||||
use crate::blob_verification::{AvailabilityPendingBlock, verify_data_availability};
|
||||
use crate::block_verification::IntoExecutionPendingBlock;
|
||||
use types::{EthSpec, Hash256, SignedBeaconBlock, SignedBlobSidecar};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum AvailabilityCheckError {
|
||||
DuplicateBlob(Hash256),
|
||||
Kzg(KzgError),
|
||||
SszTypes(ssz_types::Error),
|
||||
}
|
||||
|
||||
impl From<ssz_types::Error> for AvailabilityCheckError {
|
||||
fn from(value: Error) -> Self {
|
||||
Self::SszTypes(value)
|
||||
}
|
||||
}
|
||||
|
||||
/// This cache contains
|
||||
/// - blobs that have been gossip verified
|
||||
/// - commitments for blocks that have been gossip verified, but the commitments themselves
|
||||
/// have not been verified against blobs
|
||||
/// - blocks that have been fully verified and only require a data availability check
|
||||
pub struct GossipBlobCache<T: EthSpec> {
|
||||
blob_cache: Mutex<GossipBlobCacheInner<T>>
|
||||
pub struct DataAvailabilityChecker<T: EthSpec> {
|
||||
rpc_blob_cache: RwLock<HashMap<BlobIdentifier, Arc<BlobSidecar<T>>>>,
|
||||
gossip_blob_cache: Mutex<HashMap<Hash256, GossipBlobCache<T>>>,
|
||||
kzg: Option<Arc<Kzg>>,
|
||||
}
|
||||
|
||||
struct GossipBlobCacheInner<T: EthSpec> {
|
||||
// used when all blobs are not yet present and when the block is not yet present
|
||||
|
||||
//TODO(sean) do we want two versions of this cache, one meant to serve RPC?
|
||||
unverified_blobs: BTreeMap<BlobIdentifier, Arc<BlobSidecar<T>>>,
|
||||
// used when the block was fully processed before we received all blobs
|
||||
availability_pending_blocks: HashMap<Hash256, AvailabilityPendingBlock<T>>,
|
||||
// used to cache kzg commitments from gossip verified blocks in case we receive all blobs during block processing
|
||||
unverified_commitments: HashMap<Hash256, VariableList<KzgCommitment, T::MaxBlobsPerBlock>>,
|
||||
// used when block + blob kzg verification completes prior before block processing
|
||||
verified_commitments: HashSet<Hash256>,
|
||||
pub enum Availability<T: EthSpec> {
|
||||
PendingBlobs(Vec<BlobIdentifier>),
|
||||
PendingBlock(Hash256),
|
||||
Available(ExecutedBlock<T>),
|
||||
}
|
||||
|
||||
impl <T: EthSpec> GossipBlobCache<T> {
|
||||
pub fn new() -> Self {
|
||||
struct GossipBlobCache<T: EthSpec> {
|
||||
verified_blobs: Vec<Arc<BlobSidecar<T>>>,
|
||||
executed_block: Option<ExecutedBlock<T>>,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> DataAvailabilityChecker<T> {
|
||||
pub fn new(kzg: Option<Arc<Kzg>>) -> Self {
|
||||
Self {
|
||||
blob_cache: Mutex::new(GossipBlobCacheInner {
|
||||
unverified_blobs: BTreeMap::new(),
|
||||
availability_pending_blocks: HashMap::new(),
|
||||
unverified_commitments: HashMap::new(),
|
||||
verified_commitments: HashSet::new(),
|
||||
})
|
||||
rpc_blob_cache: <_>::default(),
|
||||
gossip_blob_cache: <_>::default(),
|
||||
kzg,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// When we receive a blob check if we've cached it. If it completes a set and we have the
|
||||
@@ -49,59 +65,192 @@ impl <T: EthSpec> GossipBlobCache<T> {
|
||||
/// cached, verify the block and import it.
|
||||
///
|
||||
/// This should only accept gossip verified blobs, so we should not have to worry about dupes.
|
||||
pub fn put_blob(&self, blob: Arc<BlobSidecar<T>>) {
|
||||
let blob_id = blob.id();
|
||||
let blob_cache = self.blob_cache.lock();
|
||||
|
||||
if let Some(dup) = blob_cache.unverified_blobs.insert(blob_id, blob) {
|
||||
// return error relating to gossip validation failure
|
||||
}
|
||||
|
||||
if let Some(availability_pending_block) = blob_cache.availability_pending_blocks.get(&blob.block_root) {
|
||||
let num_blobs = availability_pending_block.kzg_commitments().len();
|
||||
let mut blobs : Vec<BlobIdentifier, BlobSidecar<T>> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0)
|
||||
..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect();
|
||||
|
||||
if blobs.len() == num_blobs {
|
||||
// verify
|
||||
// import
|
||||
}
|
||||
} else if let Some(commitments) = blob_cache.unverified_commitments.get(&blob.block_root) {
|
||||
let num_blobs = commitments.len();
|
||||
let mut blobs : Vec<BlobIdentifier, BlobSidecar<T>> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0)
|
||||
..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect();
|
||||
|
||||
if blobs.len() == num_blobs {
|
||||
// verify
|
||||
// cache
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub fn put_commitments(&self, block_root: Hash256, kzg_commitments: VariableList<KzgCommitment, T::MaxBlobsPerBlock>) {
|
||||
let blob_cache = self.blob_cache.lock();
|
||||
if let Some(dup) = blob_cache.unverified_commitments.insert(block_root, kzg_commitments) {
|
||||
// return error relating to gossip validation failure
|
||||
}
|
||||
|
||||
let num_blobs = commitments.len();
|
||||
let mut blobs : Vec<BlobIdentifier, BlobSidecar<T>> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0)
|
||||
..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect();
|
||||
|
||||
if blobs.len() == num_blobs {
|
||||
// verify
|
||||
// cache
|
||||
}
|
||||
}
|
||||
|
||||
pub fn check_availability_and_import(&self, block_root: Hash256, block: AvailabilityPendingBlock<T>) -> bool {
|
||||
let blob_cache = self.blob_cache.lock();
|
||||
if blob_cache.verified_commitments.contains(&block_root) {
|
||||
true
|
||||
} else {
|
||||
// cache the block
|
||||
// return an enum here that may include the full block
|
||||
pub fn put_blob(
|
||||
&self,
|
||||
blob: Arc<BlobSidecar<T>>,
|
||||
) -> Result<Availability<T>, AvailabilityCheckError> {
|
||||
let verified = if let Some(kzg) = self.kzg.as_ref() {
|
||||
validate_blob::<T>(
|
||||
kzg,
|
||||
blob.blob.clone(),
|
||||
blob.kzg_commitment.clone(),
|
||||
blob.kzg_proof,
|
||||
)
|
||||
.map_err(|e| AvailabilityCheckError::Kzg(e))?
|
||||
} else {
|
||||
false
|
||||
}
|
||||
// error wrong fork
|
||||
};
|
||||
|
||||
// TODO(remove clones)
|
||||
|
||||
if verified {
|
||||
let mut blob_cache = self.gossip_blob_cache.lock();
|
||||
|
||||
// Gossip cache.
|
||||
blob_cache
|
||||
.entry(blob.block_root)
|
||||
.and_modify(|mut inner| {
|
||||
// All blobs reaching this cache should be gossip verified and gossip verification
|
||||
// should filter duplicates, as well as validate indices.
|
||||
inner
|
||||
.verified_blobs
|
||||
.insert(blob.index as usize, blob.clone());
|
||||
|
||||
if let Some(executed_block) = inner.executed_block.take() {
|
||||
let verified_commitments: Vec<_> = inner
|
||||
.verified_blobs
|
||||
.iter()
|
||||
.map(|blob| blob.kzg_commitment)
|
||||
.collect();
|
||||
if verified_commitments
|
||||
== executed_block
|
||||
.block
|
||||
.as_block()
|
||||
.message_eip4844()
|
||||
.unwrap() //TODO(sean) errors
|
||||
.body
|
||||
.blob_kzg_commitments
|
||||
.clone()
|
||||
.to_vec()
|
||||
{
|
||||
// send to reprocessing queue ?
|
||||
//TODO(sean) try_send?
|
||||
//TODO(sean) errors
|
||||
} else {
|
||||
let _ = inner.executed_block.insert(executed_block);
|
||||
}
|
||||
}
|
||||
})
|
||||
.or_insert(GossipBlobCache {
|
||||
verified_blobs: vec![blob.clone()],
|
||||
executed_block: None,
|
||||
});
|
||||
|
||||
drop(blob_cache);
|
||||
|
||||
// RPC cache.
|
||||
self.rpc_blob_cache.write().insert(blob.id(), blob.clone());
|
||||
}
|
||||
|
||||
Ok(Availability::PendingBlobs(vec![]))
|
||||
}
|
||||
|
||||
// return an enum here that may include the full block
|
||||
pub fn check_block_availability(
|
||||
&self,
|
||||
executed_block: ExecutedBlock<T>,
|
||||
) -> Result<Availability<T>, AvailabilityCheckError> {
|
||||
let block_clone = executed_block.block.clone();
|
||||
|
||||
let availability = match block_clone {
|
||||
BlockWrapper::Available(available_block) => Availability::Available(executed_block),
|
||||
BlockWrapper::AvailabilityPending(block) => {
|
||||
if let Ok(kzg_commitments) = block.message().body().blob_kzg_commitments() {
|
||||
// first check if the blockwrapper contains blobs, if so, use those
|
||||
|
||||
let mut guard = self.gossip_blob_cache.lock();
|
||||
let entry = guard.entry(executed_block.block_root);
|
||||
|
||||
match entry {
|
||||
Entry::Occupied(mut occupied_entry) => {
|
||||
let cache: &mut GossipBlobCache<T> = occupied_entry.get_mut();
|
||||
|
||||
let verified_commitments: Vec<_> = cache
|
||||
.verified_blobs
|
||||
.iter()
|
||||
.map(|blob| blob.kzg_commitment)
|
||||
.collect();
|
||||
if verified_commitments == kzg_commitments.clone().to_vec() {
|
||||
let removed: GossipBlobCache<T> = occupied_entry.remove();
|
||||
|
||||
let ExecutedBlock {
|
||||
block: _,
|
||||
block_root,
|
||||
state,
|
||||
parent_block,
|
||||
parent_eth1_finalization_data,
|
||||
confirmed_state_roots,
|
||||
consensus_context,
|
||||
payload_verification_outcome,
|
||||
} = executed_block;
|
||||
|
||||
let available_block = BlockWrapper::Available(AvailableBlock {
|
||||
block,
|
||||
blobs: VerifiedBlobs::Available(VariableList::new(
|
||||
removed.verified_blobs,
|
||||
)?),
|
||||
});
|
||||
|
||||
let available_executed = ExecutedBlock {
|
||||
block: available_block,
|
||||
block_root,
|
||||
state,
|
||||
parent_block,
|
||||
parent_eth1_finalization_data,
|
||||
confirmed_state_roots,
|
||||
consensus_context,
|
||||
payload_verification_outcome,
|
||||
};
|
||||
Availability::Available(available_executed)
|
||||
} else {
|
||||
let mut missing_blobs = Vec::with_capacity(kzg_commitments.len());
|
||||
for i in 0..kzg_commitments.len() {
|
||||
if cache.verified_blobs.get(i).is_none() {
|
||||
missing_blobs.push(BlobIdentifier {
|
||||
block_root: executed_block.block_root,
|
||||
index: i as u64,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
//TODO(sean) add a check that missing blobs > 0
|
||||
|
||||
let _ = cache.executed_block.insert(executed_block.clone());
|
||||
// log that we cached the block?
|
||||
Availability::PendingBlobs(missing_blobs)
|
||||
}
|
||||
}
|
||||
Entry::Vacant(vacant_entry) => {
|
||||
let mut blob_ids = Vec::with_capacity(kzg_commitments.len());
|
||||
for i in 0..kzg_commitments.len() {
|
||||
blob_ids.push(BlobIdentifier {
|
||||
block_root: executed_block.block_root,
|
||||
index: i as u64,
|
||||
});
|
||||
}
|
||||
|
||||
vacant_entry.insert(GossipBlobCache {
|
||||
verified_blobs: vec![],
|
||||
executed_block: Some(executed_block),
|
||||
});
|
||||
|
||||
Availability::PendingBlobs(blob_ids)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Availability::Available(executed_block)
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(availability)
|
||||
}
|
||||
|
||||
/// Adds the blob to the cache. Returns true if adding the blob completes
|
||||
/// all the required blob sidecars for a given block root.
|
||||
///
|
||||
/// Note: we can only know this if we know `block.kzg_commitments.len()`
|
||||
pub fn put_blob_temp(
|
||||
&self,
|
||||
blob: Arc<SignedBlobSidecar<T>>,
|
||||
) -> Result<bool, AvailabilityCheckError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
/// Returns all blobs associated with a given block root otherwise returns
|
||||
/// a UnavailableBlobs error.
|
||||
pub fn blobs(&self, block_root: Hash256) -> Result<VerifiedBlobs<T>, AvailabilityCheckError> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ pub mod events;
|
||||
pub mod execution_payload;
|
||||
pub mod fork_choice_signal;
|
||||
pub mod fork_revert;
|
||||
pub mod gossip_blob_cache;
|
||||
mod head_tracker;
|
||||
pub mod historical_blocks;
|
||||
pub mod kzg_utils;
|
||||
@@ -51,12 +52,12 @@ pub mod test_utils;
|
||||
mod timeout_rw_lock;
|
||||
pub mod validator_monitor;
|
||||
pub mod validator_pubkey_cache;
|
||||
pub mod gossip_blob_cache;
|
||||
|
||||
pub use self::beacon_chain::{
|
||||
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult,
|
||||
CountUnrealized, ForkChoiceError, OverrideForkchoiceUpdate, ProduceBlockVerification,
|
||||
StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
|
||||
AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes,
|
||||
BeaconStore, ChainSegmentResult, CountUnrealized, ForkChoiceError, OverrideForkchoiceUpdate,
|
||||
ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped,
|
||||
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
|
||||
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
|
||||
};
|
||||
pub use self::beacon_snapshot::BeaconSnapshot;
|
||||
@@ -66,7 +67,7 @@ pub use self::historical_blocks::HistoricalBlockError;
|
||||
pub use attestation_verification::Error as AttestationError;
|
||||
pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError};
|
||||
pub use block_verification::{
|
||||
get_block_root, BlockError, ExecutionPayloadError, GossipVerifiedBlock,
|
||||
get_block_root, BlockError, ExecutedBlock, ExecutionPayloadError, GossipVerifiedBlock,
|
||||
};
|
||||
pub use canonical_head::{CachedHead, CanonicalHead, CanonicalHeadRwLock};
|
||||
pub use eth1_chain::{Eth1Chain, Eth1ChainBackend};
|
||||
|
||||
@@ -1680,7 +1680,8 @@ where
|
||||
NotifyExecutionLayer::Yes,
|
||||
)
|
||||
.await?
|
||||
.into();
|
||||
.try_into()
|
||||
.unwrap();
|
||||
self.chain.recompute_head_at_current_slot().await;
|
||||
Ok(block_hash)
|
||||
}
|
||||
@@ -1698,7 +1699,8 @@ where
|
||||
NotifyExecutionLayer::Yes,
|
||||
)
|
||||
.await?
|
||||
.into();
|
||||
.try_into()
|
||||
.unwrap();
|
||||
self.chain.recompute_head_at_current_slot().await;
|
||||
Ok(block_hash)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user