tons of changes, just tryna compile

This commit is contained in:
realbigsean
2023-03-16 22:25:04 -04:00
parent 9df968c992
commit be4d70eeff
17 changed files with 292 additions and 250 deletions

View File

@@ -8,7 +8,7 @@ use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_cache::BlobCache;
use crate::blob_verification::{
AsBlock, AvailableBlock, BlobError, Blobs, BlockWrapper, IntoAvailableBlock,
AsBlock, AvailableBlock, BlobError, BlockWrapper, IntoAvailableBlock, VerifiedBlobs,
};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::{
@@ -25,6 +25,7 @@ use crate::eth1_finalization_cache::{Eth1FinalizationCache, Eth1FinalizationData
use crate::events::ServerSentEventHandler;
use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle};
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
use crate::gossip_blob_cache::DataAvailabilityChecker;
use crate::head_tracker::HeadTracker;
use crate::historical_blocks::HistoricalBlockError;
use crate::kzg_utils;
@@ -76,6 +77,7 @@ use futures::channel::mpsc::Sender;
use itertools::process_results;
use itertools::Itertools;
use kzg::Kzg;
use oneshot_broadcast::Receiver;
use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella};
use parking_lot::{Mutex, RwLock};
use proto_array::{CountUnrealizedFull, DoNotReOrg, ProposerHeadError};
@@ -112,7 +114,9 @@ use store::{
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::task::JoinHandle;
use tree_hash::TreeHash;
use types::beacon_block_body::KzgCommitments;
use types::beacon_state::CloneConfig;
use types::blob_sidecar::Blobs;
use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
use types::consts::merge::INTERVALS_PER_SLOT;
use types::*;
@@ -276,7 +280,7 @@ pub enum StateSkipConfig {
pub enum BlockProcessingResult<T: BeaconChainTypes> {
Verified(Hash256),
AvailabilityPending(ExecutedBlock<T>),
AvailabilityPending(ExecutedBlock<T::EthSpec>),
}
pub trait BeaconChainTypes: Send + Sync + 'static {
@@ -440,7 +444,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub slasher: Option<Arc<Slasher<T::EthSpec>>>,
/// Provides monitoring of a set of explicitly defined validators.
pub validator_monitor: RwLock<ValidatorMonitor<T::EthSpec>>,
pub blob_cache: BlobCache<T::EthSpec>,
pub proposal_blob_cache: BlobCache<T::EthSpec>,
pub data_availability_checker: Option<DataAvailabilityChecker<T::EthSpec>>,
pub kzg: Option<Arc<Kzg>>,
}
@@ -971,35 +976,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn get_block_and_blobs_checking_early_attester_cache(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>, Error> {
// If there is no data availability boundary, the Eip4844 fork is disabled.
if let Some(finalized_data_availability_boundary) =
self.finalized_data_availability_boundary()
{
// Only use the attester cache if we can find both the block and blob
if let (Some(block), Some(blobs)) = (
self.early_attester_cache.get_block(*block_root),
self.early_attester_cache.get_blobs(*block_root),
) {
Ok(Some(SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
}))
// Attempt to get the block and blobs from the database
} else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) {
let blobs = self
.get_blobs(block_root, finalized_data_availability_boundary)?
.map(Arc::new);
Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
}))
} else {
Ok(None)
}
} else {
Ok(None)
}
) -> Result<Option<()>, Error> {
//TODO(sean) use the rpc blobs cache and revert this to the current block cache logic
Ok(Some(()))
}
/// Returns the block at the given root, if any.
@@ -1083,7 +1062,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
block_root: &Hash256,
) -> Result<Option<BlobSidecarList<T::EthSpec>>, Error> {
self.store.get_blobs(block_root)
Ok(self.store.get_blobs(block_root)?)
}
pub fn get_blinded_block(
@@ -2674,7 +2653,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
unverified_block: B,
count_unrealized: CountUnrealized,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<BlockProcessingResult<T>, BlockError<T::EthSpec>> {
) -> Result<Hash256, BlockError<T::EthSpec>> {
// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
@@ -2692,19 +2671,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// TODO(log required errors)
let executed_block = self
.clone()
.into_executed_block(execution_pending, count_unrealized)
.await?;
// Check if the executed block has all it's blobs available to qualify as a fully
// available block
let import_block = if let Ok(blobs) = self
.gossip_blob_cache
.lock()
.blobs(executed_block.block_root)
{
self.import_available_block(executed_block, blobs, count_unrealized)
let import_block = if let Some(da_checker) = self.data_availability_checker.as_ref() {
da_checker.put_block(executed_block); //TODO(sean) errors
return Err(BlockError::AvailabilityPending(block_root));
} else {
return Ok(BlockProcessingResult::AvailabilityPending(executed_block));
self.clone().import_available_block(
executed_block,
VerifiedBlobs::PreEip4844,
count_unrealized,
)
};
// Verify and import the block.
@@ -2721,7 +2702,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Increment the Prometheus counter for block processing successes.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
Ok(BlockProcessingResult::Verified(block_root))
Ok(block_root)
}
Err(e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_))) => {
debug!(
@@ -2761,7 +2742,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: Arc<Self>,
execution_pending_block: ExecutionPendingBlock<T>,
count_unrealized: CountUnrealized,
) -> Result<ExecutedBlock<T>, BlockError<T::EthSpec>> {
) -> Result<ExecutedBlock<T::EthSpec>, BlockError<T::EthSpec>> {
let ExecutionPendingBlock {
block,
block_root,
@@ -2826,8 +2807,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// (i.e., this function is not atomic).
async fn import_available_block(
self: Arc<Self>,
executed_block: ExecutedBlock<T>,
blobs: Blobs<T::EthSpec>,
executed_block: ExecutedBlock<T::EthSpec>,
blobs: VerifiedBlobs<T::EthSpec>,
count_unrealized: CountUnrealized,
) -> Result<Hash256, BlockError<T::EthSpec>> {
let ExecutedBlock {
@@ -4802,12 +4783,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.as_ref()
.ok_or(BlockProductionError::TrustedSetupNotInitialized)?;
let beacon_block_root = block.canonical_root();
let expected_kzg_commitments: &KzgCommitments<T::EthSpec> =
block.body().blob_kzg_commitments().map_err(|_| {
BlockProductionError::InvalidBlockVariant(
"EIP4844 block does not contain kzg commitments".to_string(),
)
})?;
let expected_kzg_commitments = block.body().blob_kzg_commitments().map_err(|_| {
BlockProductionError::InvalidBlockVariant(
"EIP4844 block does not contain kzg commitments".to_string(),
)
})?;
if expected_kzg_commitments.len() != blobs.len() {
return Err(BlockProductionError::MissingKzgCommitment(format!(
@@ -4856,7 +4836,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.collect::<Result<Vec<BlobSidecar<T::EthSpec>>, BlockProductionError>>()?,
);
self.blob_cache.put(beacon_block_root, blob_sidecars);
self.proposal_blob_cache
.put(beacon_block_root, blob_sidecars);
}
metrics::inc_counter(&metrics::BLOCK_PRODUCTION_SUCCESSES);
@@ -4874,8 +4855,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
fn compute_blob_kzg_proofs(
kzg: &Arc<Kzg>,
blobs: &Blobs<<T as BeaconChainTypes>::EthSpec>,
expected_kzg_commitments: &KzgCommitments<<T as BeaconChainTypes>::EthSpec>,
blobs: &Blobs<T::EthSpec>,
expected_kzg_commitments: &KzgCommitments<T::EthSpec>,
slot: Slot,
) -> Result<Vec<KzgProof>, BlockProductionError> {
blobs
@@ -6174,47 +6155,50 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.unwrap_or(false))
}
pub async fn check_data_availability(
&self,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
) -> Result<AvailableBlock<T>, Error> {
let kzg_commitments = block
.message()
.body()
.blob_kzg_commitments()
.map_err(|_| BlobError::KzgCommitmentMissing)?;
let transactions = block
.message()
.body()
.execution_payload_eip4844()
.map(|payload| payload.transactions())
.map_err(|_| BlobError::TransactionsMissing)?
.ok_or(BlobError::TransactionsMissing)?;
pub fn start_block_importer(
self: &Arc<Self>,
mut rx: tokio::sync::mpsc::Receiver<ExecutedBlock<T::EthSpec>>,
) {
let chain = self.clone();
self.task_executor.spawn(
async move {
while let Some(block) = rx.recv().await {
let ExecutedBlock {
block,
block_root,
state,
parent_block,
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
payload_verification_outcome,
} = block;
if verify_kzg_commitments_against_transactions::<T::EthSpec>(transactions, kzg_commitments)
.is_err()
{
return Err(BlobError::TransactionCommitmentMismatch);
}
let available_block = block.into_available_block().unwrap(); //TODO(sean) remove unwrap
// Validate that the kzg proof is valid against the commitments and blobs
let kzg = self
.kzg
.as_ref()
.ok_or(BlobError::TrustedSetupNotInitialized)?;
if !kzg_utils::validate_blobs_sidecar(
kzg,
block_slot,
block_root,
kzg_commitments,
blob_sidecar,
)
.map_err(BlobError::KzgError)?
{
return Err(BlobError::InvalidKzgProof);
}
Ok(())
let chain_inner = chain.clone();
let block_hash = chain
.spawn_blocking_handle(
move || {
chain_inner.import_block(
available_block,
block_root,
state,
confirmed_state_roots,
payload_verification_outcome.payload_verification_status,
CountUnrealized::True, //TODO(sean)
parent_block,
parent_eth1_finalization_data,
consensus_context,
)
},
"block_importer",
)
.await;
}
},
"block_importer_listener",
);
}
}

View File

@@ -1,3 +1,4 @@
use derivative::Derivative;
use slot_clock::SlotClock;
use std::sync::Arc;
@@ -301,30 +302,36 @@ impl<T: BeaconChainTypes> IntoAvailableBlock<T> for BlockWrapper<T::EthSpec> {
}
}
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Derivative)]
#[derivative(Hash(bound = "T: EthSpec"))]
pub struct AvailableBlock<T: EthSpec> {
pub block: Arc<SignedBeaconBlock<T>>,
pub blobs: Blobs<T>,
pub blobs: VerifiedBlobs<T>,
}
impl<T: EthSpec> AvailableBlock<T> {
pub fn blobs(&self) -> Option<Arc<BlobSidecarList<T>>> {
match &self.blobs {
Blobs::NotRequired | Blobs::None => None,
Blobs::Available(blobs) => Some(blobs.clone()),
VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreEip4844 => {
None
}
VerifiedBlobs::Available(blobs) => Some(blobs.clone()),
}
}
pub fn deconstruct(self) -> (Arc<SignedBeaconBlock<T>>, Option<Arc<BlobSidecarList<T>>>) {
match self.blobs {
Blobs::NotRequired | Blobs::None => (self.block, None),
Blobs::Available(blobs) => (self.block, Some(blobs)),
VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreEip4844 => {
(self.block, None)
}
VerifiedBlobs::Available(blobs) => (self.block, Some(blobs)),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum Blobs<E: EthSpec> {
#[derive(Clone, Debug, PartialEq, Derivative)]
#[derivative(Hash(bound = "E: EthSpec"))]
pub enum VerifiedBlobs<E: EthSpec> {
/// These blobs are available.
Available(Arc<BlobSidecarList<E>>),
/// This block is from outside the data availability boundary so doesn't require
@@ -333,7 +340,7 @@ pub enum Blobs<E: EthSpec> {
/// The block's `kzg_commitments` field is empty so it does not contain any blobs.
EmptyBlobs,
/// This is a block prior to the 4844 fork, so doesn't require any blobs
None,
PreEip4844,
}
pub trait AsBlock<E: EthSpec> {
@@ -348,7 +355,8 @@ pub trait AsBlock<E: EthSpec> {
fn canonical_root(&self) -> Hash256;
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Derivative)]
#[derivative(Hash(bound = "E: EthSpec"))]
pub enum BlockWrapper<E: EthSpec> {
/// This variant is fully available.
/// i.e. for pre-4844 blocks, it contains a (`SignedBeaconBlock`, `Blobs::None`) and for

View File

@@ -64,6 +64,7 @@ use crate::{
},
metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use derivative::Derivative;
use eth2::types::EventKind;
use execution_layer::PayloadStatus;
use fork_choice::{AttestationFromBlock, PayloadVerificationStatus};
@@ -306,6 +307,7 @@ pub enum BlockError<T: EthSpec> {
parent_root: Hash256,
},
BlobValidation(BlobError),
AvailabilityPending(Hash256),
}
impl<T: EthSpec> From<BlobError> for BlockError<T> {
@@ -487,6 +489,7 @@ impl<T: EthSpec> From<ArithError> for BlockError<T> {
}
/// Stores information about verifying a payload against an execution engine.
#[derive(Clone)]
pub struct PayloadVerificationOutcome {
pub payload_verification_status: PayloadVerificationStatus,
pub is_valid_merge_transition_block: bool,
@@ -619,7 +622,8 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
/// A wrapper around a `SignedBeaconBlock` that indicates it has been approved for re-gossiping on
/// the p2p network.
#[derive(Debug)]
#[derive(Derivative)]
#[derivative(Debug(bound = "T: BeaconChainTypes"))]
pub struct GossipVerifiedBlock<T: BeaconChainTypes> {
pub block: BlockWrapper<T::EthSpec>,
pub block_root: Hash256,
@@ -663,14 +667,15 @@ pub struct ExecutionPendingBlock<T: BeaconChainTypes> {
pub payload_verification_handle: PayloadVerificationHandle<T::EthSpec>,
}
pub struct ExecutedBlock<T: BeaconChainTypes> {
pub block: BlockWrapper<T::EthSpec>,
#[derive(Clone)]
pub struct ExecutedBlock<E: EthSpec> {
pub block: BlockWrapper<E>,
pub block_root: Hash256,
pub state: BeaconState<T::EthSpec>,
pub parent_block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>,
pub state: BeaconState<E>,
pub parent_block: SignedBeaconBlock<E, BlindedPayload<E>>,
pub parent_eth1_finalization_data: Eth1FinalizationData,
pub confirmed_state_roots: Vec<Hash256>,
pub consensus_context: ConsensusContext<T::EthSpec>,
pub consensus_context: ConsensusContext<E>,
pub payload_verification_outcome: PayloadVerificationOutcome,
}
@@ -1156,7 +1161,7 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for BlockWrapper<T::EthSp
BlockWrapper::AvailabilityPending(block) => block
.into_execution_pending_block_slashable(block_root, chain, notify_execution_layer),
BlockWrapper::Available(AvailableBlock { block, blobs }) => {
let execution_pending_block = block.into_execution_pending_block_slashable(
let mut execution_pending_block = block.into_execution_pending_block_slashable(
block_root,
chain,
notify_execution_layer,

View File

@@ -1,9 +1,11 @@
use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY};
use crate::blob_cache::BlobCache;
use crate::block_verification::ExecutedBlock;
use crate::eth1_chain::{CachingEth1Backend, SszEth1};
use crate::eth1_finalization_cache::Eth1FinalizationCache;
use crate::fork_choice_signal::ForkChoiceSignalTx;
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
use crate::gossip_blob_cache::DataAvailabilityChecker;
use crate::head_tracker::HeadTracker;
use crate::migrate::{BackgroundMigrator, MigratorConfig};
use crate::persisted_beacon_chain::PersistedBeaconChain;
@@ -85,6 +87,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
slot_clock: Option<T::SlotClock>,
shutdown_sender: Option<Sender<ShutdownReason>>,
block_importer_sender: Option<tokio::sync::mpsc::Sender<ExecutedBlock<T::EthSpec>>>,
head_tracker: Option<HeadTracker>,
validator_pubkey_cache: Option<ValidatorPubkeyCache<T>>,
spec: ChainSpec,
@@ -127,6 +130,7 @@ where
event_handler: None,
slot_clock: None,
shutdown_sender: None,
block_importer_sender: None,
head_tracker: None,
validator_pubkey_cache: None,
spec: TEthSpec::default_spec(),
@@ -558,6 +562,14 @@ where
self
}
pub fn block_importer_sender(
mut self,
sender: tokio::sync::mpsc::Sender<ExecutedBlock<TEthSpec>>,
) -> Self {
self.block_importer_sender = Some(sender);
self
}
/// Creates a new, empty operation pool.
fn empty_op_pool(mut self) -> Self {
self.op_pool = Some(OperationPool::new());
@@ -641,12 +653,18 @@ where
slot_clock.now().ok_or("Unable to read slot")?
};
let kzg = if let Some(trusted_setup) = self.trusted_setup {
let (kzg, data_availability_checker) = if let (Some(tx), Some(trusted_setup)) =
(self.block_importer_sender, self.trusted_setup)
{
let kzg = Kzg::new_from_trusted_setup(trusted_setup)
.map_err(|e| format!("Failed to load trusted setup: {:?}", e))?;
Some(Arc::new(kzg))
let kzg_arc = Arc::new(kzg);
(
Some(kzg_arc.clone()),
Some(DataAvailabilityChecker::new(kzg_arc, tx)),
)
} else {
None
(None, None)
};
let initial_head_block_root = fork_choice
@@ -850,7 +868,9 @@ where
graffiti: self.graffiti,
slasher: self.slasher.clone(),
validator_monitor: RwLock::new(validator_monitor),
blob_cache: BlobCache::default(),
//TODO(sean) should we move kzg solely to the da checker?
data_availability_checker,
proposal_blob_cache: BlobCache::default(),
kzg,
};

View File

@@ -134,9 +134,9 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
/// contains a few extra checks by running `partially_verify_execution_payload` first:
///
/// https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/bellatrix/beacon-chain.md#notify_new_payload
async fn notify_new_payload<T: BeaconChainTypes>(
async fn notify_new_payload<'a, T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
block: BeaconBlockRef<T::EthSpec>,
block: BeaconBlockRef<'a, T::EthSpec>,
) -> Result<PayloadVerificationStatus, BlockError<T::EthSpec>> {
let execution_payload = block.execution_payload()?;

View File

@@ -1,7 +1,7 @@
use crate::blob_verification::verify_data_availability;
use crate::blob_verification::{verify_data_availability, AsBlock};
use crate::block_verification::{ExecutedBlock, IntoExecutionPendingBlock};
use crate::kzg_utils::validate_blob;
use crate::{BeaconChainError, BeaconChainTypes, BlockError};
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, BlockError};
use eth2::reqwest::header::Entry;
use kzg::Error as KzgError;
use kzg::{Kzg, KzgCommitment};
@@ -9,9 +9,10 @@ use parking_lot::{Mutex, RwLock};
use ssz_types::VariableList;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::future::Future;
use std::sync::Arc;
use std::sync::{mpsc, Arc};
use tokio::sync::mpsc::Sender;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar};
use types::{EthSpec, Hash256};
use types::{EthSpec, Hash256, SignedBeaconBlock};
pub enum BlobCacheError {
DuplicateBlob(Hash256),
@@ -22,23 +23,25 @@ pub enum BlobCacheError {
/// - commitments for blocks that have been gossip verified, but the commitments themselves
/// have not been verified against blobs
/// - blocks that have been fully verified and only require a data availability check
pub struct GossipBlobCache<T: BeaconChainTypes> {
rpc_blob_cache: RwLock<HashMap<BlobIdentifier, Arc<BlobSidecar<T::EthSpec>>>>,
gossip_blob_cache: Mutex<HashMap<Hash256, GossipBlobCacheInner<T>>>,
kzg: Kzg,
pub struct DataAvailabilityChecker<T: EthSpec> {
rpc_blob_cache: RwLock<HashMap<BlobIdentifier, Arc<BlobSidecar<T>>>>,
gossip_blob_cache: Mutex<HashMap<Hash256, GossipBlobCache<T>>>,
kzg: Arc<Kzg>,
tx: Sender<ExecutedBlock<T>>,
}
struct GossipBlobCacheInner<T: BeaconChainTypes> {
verified_blobs: Vec<Arc<BlobSidecar<T::EthSpec>>>,
struct GossipBlobCache<T: EthSpec> {
verified_blobs: Vec<Arc<BlobSidecar<T>>>,
executed_block: Option<ExecutedBlock<T>>,
}
impl<T: BeaconChainTypes> GossipBlobCache<T> {
pub fn new(kzg: Kzg) -> Self {
impl<T: EthSpec> DataAvailabilityChecker<T> {
pub fn new(kzg: Arc<Kzg>, tx: Sender<ExecutedBlock<T>>) -> Self {
Self {
rpc_blob_cache: RwLock::new(HashMap::new()),
gossip_blob_cache: Mutex::new(HashMap::new()),
rpc_blob_cache: <_>::default(),
gossip_blob_cache: <_>::default(),
kzg,
tx,
}
}
@@ -47,9 +50,9 @@ impl<T: BeaconChainTypes> GossipBlobCache<T> {
/// cached, verify the block and import it.
///
/// This should only accept gossip verified blobs, so we should not have to worry about dupes.
pub fn put_blob(&self, blob: Arc<BlobSidecar<T::EthSpec>>) -> Result<(), BlobCacheError> {
pub fn put_blob(&self, blob: Arc<BlobSidecar<T>>) -> Result<(), BlobCacheError> {
// TODO(remove clones)
let verified = validate_blob::<T::EthSpec>(
let verified = validate_blob::<T>(
&self.kzg,
blob.blob.clone(),
blob.kzg_commitment.clone(),
@@ -70,11 +73,33 @@ impl<T: BeaconChainTypes> GossipBlobCache<T> {
.verified_blobs
.insert(blob.index as usize, blob.clone());
if let Some(executed_block) = inner.executed_block.as_ref() {
// trigger reprocessing ?
if let Some(executed_block) = inner.executed_block.take() {
let verified_commitments: Vec<_> = inner
.verified_blobs
.iter()
.map(|blob| blob.kzg_commitment)
.collect();
if verified_commitments
== executed_block
.block
.as_block()
.message_eip4844()
.unwrap() //TODO(sean) errors
.body
.blob_kzg_commitments
.clone()
.to_vec()
{
// send to reprocessing queue ?
//TODO(sean) try_send?
//TODO(sean) errors
self.tx.try_send(executed_block);
} else {
let _ = inner.executed_block.insert(executed_block);
}
}
})
.or_insert(GossipBlobCacheInner {
.or_insert(GossipBlobCache {
verified_blobs: vec![blob.clone()],
executed_block: None,
});
@@ -93,24 +118,26 @@ impl<T: BeaconChainTypes> GossipBlobCache<T> {
guard
.entry(executed_block.block_root)
.and_modify(|cache| {
if let Ok(block) = executed_block.block.message_eip4844() {
let verified_commitments_vec: Vec<_> = cache
let block: &SignedBeaconBlock<T> = executed_block.block.as_block();
if let Ok(block) = block.message_eip4844() {
let verified_commitments: Vec<_> = cache
.verified_blobs
.iter()
.map(|blob| blob.kzg_commitment)
.collect();
let verified_commitments = VariableList::from(verified_commitments_vec);
if verified_commitments == block.body.blob_kzg_commitments {
if verified_commitments == block.body.blob_kzg_commitments.clone().to_vec() {
// send to reprocessing queue ?
//TODO(sean) errors
self.tx.try_send(executed_block.clone());
} else {
let _ = cache.executed_block.insert(executed_block);
let _ = cache.executed_block.insert(executed_block.clone());
// log that we cached
}
} else {
// log error
}
})
.or_insert(GossipBlobCacheInner {
.or_insert(GossipBlobCache {
verified_blobs: vec![],
executed_block: Some(executed_block),
});

View File

@@ -66,7 +66,7 @@ pub use self::historical_blocks::HistoricalBlockError;
pub use attestation_verification::Error as AttestationError;
pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError};
pub use block_verification::{
get_block_root, BlockError, ExecutionPayloadError, GossipVerifiedBlock,
get_block_root, BlockError, ExecutedBlock, ExecutionPayloadError, GossipVerifiedBlock,
};
pub use canonical_head::{CachedHead, CanonicalHead, CanonicalHeadRwLock};
pub use eth1_chain::{Eth1Chain, Eth1ChainBackend};