From 05db0d2ba3744267f4a68ae4e4029aafc3730868 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 17 Mar 2023 16:12:40 -0400 Subject: [PATCH] everything, everywhere, all at once --- beacon_node/beacon_chain/src/beacon_chain.rs | 295 +++++++++--------- .../beacon_chain/src/blob_verification.rs | 33 +- .../beacon_chain/src/block_verification.rs | 9 +- beacon_node/beacon_chain/src/builder.rs | 23 +- .../beacon_chain/src/early_attester_cache.rs | 6 +- .../beacon_chain/src/gossip_blob_cache.rs | 182 ++++++++--- beacon_node/beacon_chain/src/lib.rs | 7 +- beacon_node/beacon_chain/src/test_utils.rs | 6 +- beacon_node/client/src/builder.rs | 6 +- beacon_node/http_api/src/publish_blocks.rs | 22 +- .../beacon_processor/worker/gossip_methods.rs | 41 ++- .../beacon_processor/worker/sync_methods.rs | 5 +- beacon_node/network/src/sync/manager.rs | 4 +- beacon_node/store/src/hot_cold_store.rs | 19 +- beacon_node/store/src/lib.rs | 3 +- consensus/types/src/blob_sidecar.rs | 4 + consensus/types/src/signed_blob.rs | 41 ++- 17 files changed, 453 insertions(+), 253 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 5039724649..4115b2965b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -25,7 +25,7 @@ use crate::eth1_finalization_cache::{Eth1FinalizationCache, Eth1FinalizationData use crate::events::ServerSentEventHandler; use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle}; use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; -use crate::gossip_blob_cache::DataAvailabilityChecker; +use crate::gossip_blob_cache::{Availability, AvailabilityCheckError, DataAvailabilityChecker}; use crate::head_tracker::HeadTracker; use crate::historical_blocks::HistoricalBlockError; use crate::kzg_utils; @@ -116,7 +116,7 @@ use tokio::task::JoinHandle; use tree_hash::TreeHash; use types::beacon_block_body::KzgCommitments; use types::beacon_state::CloneConfig; -use types::blob_sidecar::Blobs; +use types::blob_sidecar::{BlobIdentifier, BlobSidecarArcList, Blobs}; use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; use types::consts::merge::INTERVALS_PER_SLOT; use types::*; @@ -189,6 +189,25 @@ pub enum WhenSlotSkipped { Prev, } +#[derive(Debug)] +pub enum AvailabilityProcessingStatus { + PendingBlobs(Vec), + PendingBlock(Hash256), + Imported(Hash256), +} + +//TODO(sean) using this in tests for now +impl TryInto for AvailabilityProcessingStatus { + type Error = (); + + fn try_into(self) -> Result { + match self { + AvailabilityProcessingStatus::Imported(hash) => Ok(hash.into()), + _ => Err(()), + } + } +} + /// The result of a chain segment processing. pub enum ChainSegmentResult { /// Processing this chain segment finished successfully. @@ -445,7 +464,7 @@ pub struct BeaconChain { /// Provides monitoring of a set of explicitly defined validators. pub validator_monitor: RwLock>, pub proposal_blob_cache: BlobCache, - pub data_availability_checker: Option>, + pub data_availability_checker: DataAvailabilityChecker, pub kzg: Option>, } @@ -1061,7 +1080,7 @@ impl BeaconChain { pub fn get_blobs( &self, block_root: &Hash256, - ) -> Result>, Error> { + ) -> Result>, Error> { Ok(self.store.get_blobs(block_root)?) } @@ -2635,6 +2654,18 @@ impl BeaconChain { .map_err(BeaconChainError::TokioJoin)? } + pub async fn process_blob( + self: &Arc, + blob: BlobSidecar, + count_unrealized: CountUnrealized, + ) -> Result> { + self.check_availability_and_maybe_import( + |chain| chain.data_availability_checker.put_blob(Arc::new(blob)), + count_unrealized, + ) + .await + } + /// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and /// imported into the chain. /// @@ -2653,7 +2684,7 @@ impl BeaconChain { unverified_block: B, count_unrealized: CountUnrealized, notify_execution_layer: NotifyExecutionLayer, - ) -> Result> { + ) -> Result> { // Start the Prometheus timer. let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); @@ -2673,65 +2704,18 @@ impl BeaconChain { let executed_block = self .clone() .into_executed_block(execution_pending, count_unrealized) - .await?; + .await + .map_err(|e| self.handle_block_error(e))?; - // Check if the executed block has all it's blobs available to qualify as a fully - // available block - let import_block = if let Some(da_checker) = self.data_availability_checker.as_ref() { - da_checker.put_block(executed_block); //TODO(sean) errors - return Err(BlockError::AvailabilityPending(block_root)); - } else { - self.clone().import_available_block( - executed_block, - VerifiedBlobs::PreEip4844, - count_unrealized, - ) - }; - - // Verify and import the block. - match import_block.await { - // The block was successfully verified and imported. Yay. - Ok(block_root) => { - trace!( - self.log, - "Beacon block imported"; - "block_root" => ?block_root, - "block_slot" => slot, - ); - - // Increment the Prometheus counter for block processing successes. - metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); - - Ok(block_root) - } - Err(e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_))) => { - debug!( - self.log, - "Beacon block processing cancelled"; - "error" => ?e, - ); - Err(e) - } - // There was an error whilst attempting to verify and import the block. The block might - // be partially verified or partially imported. - Err(BlockError::BeaconChainError(e)) => { - crit!( - self.log, - "Beacon block processing error"; - "error" => ?e, - ); - Err(BlockError::BeaconChainError(e)) - } - // The block failed verification. - Err(other) => { - trace!( - self.log, - "Beacon block rejected"; - "reason" => other.to_string(), - ); - Err(other) - } - } + self.check_availability_and_maybe_import( + |chain| { + chain + .data_availability_checker + .check_block_availability(executed_block) + }, + count_unrealized, + ) + .await } /// Accepts a fully-verified block and awaits on it's payload verification handle to @@ -2800,55 +2784,118 @@ impl BeaconChain { }) } + fn handle_block_error(&self, e: BlockError) -> BlockError { + match e { + e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_)) => { + debug!( + self.log, + "Beacon block processing cancelled"; + "error" => ?e, + ); + e + } + BlockError::BeaconChainError(e) => { + crit!( + self.log, + "Beacon block processing error"; + "error" => ?e, + ); + BlockError::BeaconChainError(e) + } + other => { + trace!( + self.log, + "Beacon block rejected"; + "reason" => other.to_string(), + ); + other + } + } + } + /// Accepts a fully-verified, available block and imports it into the chain without performing any /// additional verification. /// /// An error is returned if the block was unable to be imported. It may be partially imported /// (i.e., this function is not atomic). - async fn import_available_block( - self: Arc, - executed_block: ExecutedBlock, - blobs: VerifiedBlobs, + async fn check_availability_and_maybe_import( + self: &Arc, + cache_fn: impl FnOnce(Arc) -> Result, AvailabilityCheckError>, count_unrealized: CountUnrealized, - ) -> Result> { - let ExecutedBlock { - block, - block_root, - state, - parent_block, - confirmed_state_roots, - payload_verification_outcome, - parent_eth1_finalization_data, - consensus_context, - } = executed_block; + ) -> Result> { + let availability = cache_fn(self.clone())?; + match availability { + Availability::Available(block) => { + let ExecutedBlock { + block, + block_root, + state, + parent_block, + parent_eth1_finalization_data, + confirmed_state_roots, + consensus_context, + payload_verification_outcome, + } = block; - let chain = self.clone(); + let available_block = match block { + BlockWrapper::Available(block) => block, + BlockWrapper::AvailabilityPending(_) => { + todo!() // logic error + } + }; - let available_block = AvailableBlock { - block: block.block_cloned(), - blobs: blobs, - }; + let slot = available_block.block.slot(); - let block_hash = self - .spawn_blocking_handle( - move || { - chain.import_block( - available_block, - block_root, - state, - confirmed_state_roots, - payload_verification_outcome.payload_verification_status, - count_unrealized, - parent_block, - parent_eth1_finalization_data, - consensus_context, + // import + let chain = self.clone(); + let result = self + .spawn_blocking_handle( + move || { + chain.import_block( + available_block, + block_root, + state, + confirmed_state_roots, + payload_verification_outcome.payload_verification_status, + count_unrealized, + parent_block, + parent_eth1_finalization_data, + consensus_context, + ) + }, + "payload_verification_handle", ) - }, - "payload_verification_handle", - ) - .await??; + .await + .map_err(|e| { + let b = BlockError::from(e); + self.handle_block_error(b) + })?; - Ok(block_hash) + match result { + // The block was successfully verified and imported. Yay. + Ok(block_root) => { + trace!( + self.log, + "Beacon block imported"; + "block_root" => ?block_root, + "block_slot" => slot, + ); + + // Increment the Prometheus counter for block processing successes. + metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); + + Ok(AvailabilityProcessingStatus::Imported(block_root)) + } + Err(e) => Err(self.handle_block_error(e)), + } + } + Availability::PendingBlock(block_root) => { + Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) + } + Availability::PendingBlobs(blob_ids) => { + Ok(AvailabilityProcessingStatus::PendingBlobs(blob_ids)) + } + } } /// Accepts a fully-verified and available block and imports it into the chain without performing any @@ -6154,52 +6201,6 @@ impl BeaconChain { .map(|fork_epoch| fork_epoch <= current_epoch) .unwrap_or(false)) } - - pub fn start_block_importer( - self: &Arc, - mut rx: tokio::sync::mpsc::Receiver>, - ) { - let chain = self.clone(); - self.task_executor.spawn( - async move { - while let Some(block) = rx.recv().await { - let ExecutedBlock { - block, - block_root, - state, - parent_block, - parent_eth1_finalization_data, - confirmed_state_roots, - consensus_context, - payload_verification_outcome, - } = block; - - let available_block = block.into_available_block().unwrap(); //TODO(sean) remove unwrap - - let chain_inner = chain.clone(); - let block_hash = chain - .spawn_blocking_handle( - move || { - chain_inner.import_block( - available_block, - block_root, - state, - confirmed_state_roots, - payload_verification_outcome.payload_verification_status, - CountUnrealized::True, //TODO(sean) - parent_block, - parent_eth1_finalization_data, - consensus_context, - ) - }, - "block_importer", - ) - .await; - } - }, - "block_importer_listener", - ); - } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 013fae1fd5..d80fe03f79 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -1,16 +1,20 @@ use derivative::Derivative; use slot_clock::SlotClock; +use state_processing::ConsensusContext; use std::sync::Arc; use crate::beacon_chain::{ BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, }; +use crate::snapshot_cache::PreProcessingSnapshot; use crate::BeaconChainError; use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions; +use types::blob_sidecar::BlobSidecarArcList; use types::{ - BeaconBlockRef, BeaconStateError, BlobSidecarList, Epoch, EthSpec, Hash256, KzgCommitment, - SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, Transactions, + BeaconBlockRef, BeaconStateError, BlobSidecar, BlobSidecarList, Epoch, EthSpec, Hash256, + KzgCommitment, SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, + Transactions, }; #[derive(Debug)] @@ -122,11 +126,24 @@ impl From for BlobError { } } +/// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on +/// the p2p network. +#[derive(Debug)] +pub struct GossipVerifiedBlob { + blob: BlobSidecar, +} + +impl GossipVerifiedBlob { + pub fn to_blob(self) -> BlobSidecar { + self.blob + } +} + pub fn validate_blob_sidecar_for_gossip( blob_sidecar: SignedBlobSidecar, subnet: u64, chain: &BeaconChain, -) -> Result<(), BlobError> { +) -> Result, BlobError> { let blob_slot = blob_sidecar.message.slot; let blob_index = blob_sidecar.message.index; let block_root = blob_sidecar.message.block_root; @@ -240,7 +257,9 @@ pub fn validate_blob_sidecar_for_gossip( }); } - Ok(()) + Ok(GossipVerifiedBlob { + blob: blob_sidecar.message, + }) } pub fn verify_data_availability( @@ -310,7 +329,7 @@ pub struct AvailableBlock { } impl AvailableBlock { - pub fn blobs(&self) -> Option>> { + pub fn blobs(&self) -> Option> { match &self.blobs { VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreEip4844 => { None @@ -319,7 +338,7 @@ impl AvailableBlock { } } - pub fn deconstruct(self) -> (Arc>, Option>>) { + pub fn deconstruct(self) -> (Arc>, Option>) { match self.blobs { VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreEip4844 => { (self.block, None) @@ -333,7 +352,7 @@ impl AvailableBlock { #[derivative(Hash(bound = "E: EthSpec"))] pub enum VerifiedBlobs { /// These blobs are available. - Available(Arc>), + Available(BlobSidecarArcList), /// This block is from outside the data availability boundary so doesn't require /// a data availability check. NotRequired, diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 4ee15663f1..68c4d1cebc 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -54,6 +54,7 @@ use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, AllowOptimisticImport, NotifyExecutionLayer, PayloadNotifier, }; +use crate::gossip_blob_cache::AvailabilityCheckError; use crate::snapshot_cache::PreProcessingSnapshot; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; @@ -307,7 +308,7 @@ pub enum BlockError { parent_root: Hash256, }, BlobValidation(BlobError), - AvailabilityPending(Hash256), + AvailabilityCheck(AvailabilityCheckError), } impl From for BlockError { @@ -316,6 +317,12 @@ impl From for BlockError { } } +impl From for BlockError { + fn from(e: AvailabilityCheckError) -> Self { + Self::AvailabilityCheck(e) + } +} + /// Returned when block validation failed due to some issue verifying /// the execution payload. #[derive(Debug)] diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 36a6a33796..7da25b207f 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -87,7 +87,6 @@ pub struct BeaconChainBuilder { event_handler: Option>, slot_clock: Option, shutdown_sender: Option>, - block_importer_sender: Option>>, head_tracker: Option, validator_pubkey_cache: Option>, spec: ChainSpec, @@ -130,7 +129,6 @@ where event_handler: None, slot_clock: None, shutdown_sender: None, - block_importer_sender: None, head_tracker: None, validator_pubkey_cache: None, spec: TEthSpec::default_spec(), @@ -562,14 +560,6 @@ where self } - pub fn block_importer_sender( - mut self, - sender: tokio::sync::mpsc::Sender>, - ) -> Self { - self.block_importer_sender = Some(sender); - self - } - /// Creates a new, empty operation pool. fn empty_op_pool(mut self) -> Self { self.op_pool = Some(OperationPool::new()); @@ -653,18 +643,13 @@ where slot_clock.now().ok_or("Unable to read slot")? }; - let (kzg, data_availability_checker) = if let (Some(tx), Some(trusted_setup)) = - (self.block_importer_sender, self.trusted_setup) - { + let kzg = if let Some(trusted_setup) = self.trusted_setup { let kzg = Kzg::new_from_trusted_setup(trusted_setup) .map_err(|e| format!("Failed to load trusted setup: {:?}", e))?; let kzg_arc = Arc::new(kzg); - ( - Some(kzg_arc.clone()), - Some(DataAvailabilityChecker::new(kzg_arc, tx)), - ) + Some(kzg_arc) } else { - (None, None) + None }; let initial_head_block_root = fork_choice @@ -869,7 +854,7 @@ where slasher: self.slasher.clone(), validator_monitor: RwLock::new(validator_monitor), //TODO(sean) should we move kzg solely to the da checker? - data_availability_checker, + data_availability_checker: DataAvailabilityChecker::new(kzg.clone()), proposal_blob_cache: BlobCache::default(), kzg, }; diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index 11eeeadedc..2520af41a2 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -6,6 +6,7 @@ use crate::{ use parking_lot::RwLock; use proto_array::Block as ProtoBlock; use std::sync::Arc; +use types::blob_sidecar::BlobSidecarArcList; use types::*; pub struct CacheItem { @@ -21,7 +22,8 @@ pub struct CacheItem { * Values used to make the block available. */ block: Arc>, - blobs: Option>>, + //TODO(sean) remove this and just use the da checker?' + blobs: Option>, proto_block: ProtoBlock, } @@ -160,7 +162,7 @@ impl EarlyAttesterCache { } /// Returns the blobs, if `block_root` matches the cached item. - pub fn get_blobs(&self, block_root: Hash256) -> Option>> { + pub fn get_blobs(&self, block_root: Hash256) -> Option> { self.item .read() .as_ref() diff --git a/beacon_node/beacon_chain/src/gossip_blob_cache.rs b/beacon_node/beacon_chain/src/gossip_blob_cache.rs index ed12970ec6..08fe4a6645 100644 --- a/beacon_node/beacon_chain/src/gossip_blob_cache.rs +++ b/beacon_node/beacon_chain/src/gossip_blob_cache.rs @@ -1,12 +1,14 @@ -use crate::blob_verification::{verify_data_availability, AsBlock}; +use crate::blob_verification::{ + verify_data_availability, AsBlock, AvailableBlock, BlockWrapper, VerifiedBlobs, +}; use crate::block_verification::{ExecutedBlock, IntoExecutionPendingBlock}; use crate::kzg_utils::validate_blob; use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, BlockError}; -use eth2::reqwest::header::Entry; use kzg::Error as KzgError; use kzg::{Kzg, KzgCommitment}; use parking_lot::{Mutex, RwLock}; -use ssz_types::VariableList; +use ssz_types::{Error, VariableList}; +use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; use std::future::Future; use std::sync::{mpsc, Arc}; @@ -14,10 +16,19 @@ use tokio::sync::mpsc::Sender; use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; use types::{EthSpec, Hash256, SignedBeaconBlock}; -pub enum BlobCacheError { +#[derive(Debug)] +pub enum AvailabilityCheckError { DuplicateBlob(Hash256), Kzg(KzgError), + SszTypes(ssz_types::Error), } + +impl From for AvailabilityCheckError { + fn from(value: Error) -> Self { + Self::SszTypes(value) + } +} + /// This cache contains /// - blobs that have been gossip verified /// - commitments for blocks that have been gossip verified, but the commitments themselves @@ -26,8 +37,13 @@ pub enum BlobCacheError { pub struct DataAvailabilityChecker { rpc_blob_cache: RwLock>>>, gossip_blob_cache: Mutex>>, - kzg: Arc, - tx: Sender>, + kzg: Option>, +} + +pub enum Availability { + PendingBlobs(Vec), + PendingBlock(Hash256), + Available(ExecutedBlock), } struct GossipBlobCache { @@ -36,12 +52,11 @@ struct GossipBlobCache { } impl DataAvailabilityChecker { - pub fn new(kzg: Arc, tx: Sender>) -> Self { + pub fn new(kzg: Option>) -> Self { Self { rpc_blob_cache: <_>::default(), gossip_blob_cache: <_>::default(), kzg, - tx, } } @@ -50,15 +65,25 @@ impl DataAvailabilityChecker { /// cached, verify the block and import it. /// /// This should only accept gossip verified blobs, so we should not have to worry about dupes. - pub fn put_blob(&self, blob: Arc>) -> Result<(), BlobCacheError> { + // return an enum here that may include the full block + pub fn put_blob( + &self, + blob: Arc>, + ) -> Result, AvailabilityCheckError> { + let verified = if let Some(kzg) = self.kzg.as_ref() { + validate_blob::( + kzg, + blob.blob.clone(), + blob.kzg_commitment.clone(), + blob.kzg_proof, + ) + .map_err(|e| AvailabilityCheckError::Kzg(e))? + } else { + false + // error wrong fork + }; + // TODO(remove clones) - let verified = validate_blob::( - &self.kzg, - blob.blob.clone(), - blob.kzg_commitment.clone(), - blob.kzg_proof, - ) - .map_err(|e| BlobCacheError::Kzg(e))?; if verified { let mut blob_cache = self.gossip_blob_cache.lock(); @@ -93,7 +118,6 @@ impl DataAvailabilityChecker { // send to reprocessing queue ? //TODO(sean) try_send? //TODO(sean) errors - self.tx.try_send(executed_block); } else { let _ = inner.executed_block.insert(executed_block); } @@ -110,38 +134,106 @@ impl DataAvailabilityChecker { self.rpc_blob_cache.write().insert(blob.id(), blob.clone()); } - Ok(()) + Ok(Availability::PendingBlobs(vec![])) } - pub fn put_block(&self, executed_block: ExecutedBlock) -> Result<(), BlobCacheError> { - let mut guard = self.gossip_blob_cache.lock(); - guard - .entry(executed_block.block_root) - .and_modify(|cache| { - let block: &SignedBeaconBlock = executed_block.block.as_block(); - if let Ok(block) = block.message_eip4844() { - let verified_commitments: Vec<_> = cache - .verified_blobs - .iter() - .map(|blob| blob.kzg_commitment) - .collect(); - if verified_commitments == block.body.blob_kzg_commitments.clone().to_vec() { - // send to reprocessing queue ? - //TODO(sean) errors - self.tx.try_send(executed_block.clone()); - } else { - let _ = cache.executed_block.insert(executed_block.clone()); - // log that we cached + // return an enum here that may include the full block + pub fn check_block_availability( + &self, + executed_block: ExecutedBlock, + ) -> Result, AvailabilityCheckError> { + let block_clone = executed_block.block.clone(); + + let availability = match block_clone { + BlockWrapper::Available(available_block) => Availability::Available(executed_block), + BlockWrapper::AvailabilityPending(block) => { + if let Ok(kzg_commitments) = block.message().body().blob_kzg_commitments() { + // first check if the blockwrapper contains blobs, if so, use those + + let mut guard = self.gossip_blob_cache.lock(); + let entry = guard.entry(executed_block.block_root); + + match entry { + Entry::Occupied(mut occupied_entry) => { + let cache: &mut GossipBlobCache = occupied_entry.get_mut(); + + let verified_commitments: Vec<_> = cache + .verified_blobs + .iter() + .map(|blob| blob.kzg_commitment) + .collect(); + if verified_commitments == kzg_commitments.clone().to_vec() { + let removed: GossipBlobCache = occupied_entry.remove(); + + let ExecutedBlock { + block: _, + block_root, + state, + parent_block, + parent_eth1_finalization_data, + confirmed_state_roots, + consensus_context, + payload_verification_outcome, + } = executed_block; + + let available_block = BlockWrapper::Available(AvailableBlock { + block, + blobs: VerifiedBlobs::Available(VariableList::new( + removed.verified_blobs, + )?), + }); + + let available_executed = ExecutedBlock { + block: available_block, + block_root, + state, + parent_block, + parent_eth1_finalization_data, + confirmed_state_roots, + consensus_context, + payload_verification_outcome, + }; + Availability::Available(available_executed) + } else { + let mut missing_blobs = Vec::with_capacity(kzg_commitments.len()); + for i in 0..kzg_commitments.len() { + if cache.verified_blobs.get(i).is_none() { + missing_blobs.push(BlobIdentifier { + block_root: executed_block.block_root, + index: i as u64, + }) + } + } + + //TODO(sean) add a check that missing blobs > 0 + + let _ = cache.executed_block.insert(executed_block.clone()); + // log that we cached the block? + Availability::PendingBlobs(missing_blobs) + } + } + Entry::Vacant(vacant_entry) => { + let mut blob_ids = Vec::with_capacity(kzg_commitments.len()); + for i in 0..kzg_commitments.len() { + blob_ids.push(BlobIdentifier { + block_root: executed_block.block_root, + index: i as u64, + }); + } + + vacant_entry.insert(GossipBlobCache { + verified_blobs: vec![], + executed_block: Some(executed_block), + }); + + Availability::PendingBlobs(blob_ids) + } } } else { - // log error + Availability::Available(executed_block) } - }) - .or_insert(GossipBlobCache { - verified_blobs: vec![], - executed_block: Some(executed_block), - }); - - Ok(()) + } + }; + Ok(availability) } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index f1779b45fb..1a0247f99a 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -54,9 +54,10 @@ pub mod validator_monitor; pub mod validator_pubkey_cache; pub use self::beacon_chain::{ - AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, - CountUnrealized, ForkChoiceError, OverrideForkchoiceUpdate, ProduceBlockVerification, - StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, + AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, + BeaconStore, ChainSegmentResult, CountUnrealized, ForkChoiceError, OverrideForkchoiceUpdate, + ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped, + INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; pub use self::beacon_snapshot::BeaconSnapshot; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index a784c67411..df2545adf1 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1680,7 +1680,8 @@ where NotifyExecutionLayer::Yes, ) .await? - .into(); + .try_into() + .unwrap(); self.chain.recompute_head_at_current_slot().await; Ok(block_hash) } @@ -1698,7 +1699,8 @@ where NotifyExecutionLayer::Yes, ) .await? - .into(); + .try_into() + .unwrap(); self.chain.recompute_head_at_current_slot().await; Ok(block_hash) } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 5d283eae61..41e08bfdca 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -874,14 +874,10 @@ where .ok_or("beacon_chain requires a slot clock")?, ) .shutdown_sender(context.executor.shutdown_sender()) - .block_importer_sender(tx) .build() .map_err(|e| format!("Failed to build beacon chain: {}", e))?; - let arc_chain = Arc::new(chain); - arc_chain.start_block_importer(rx); - - self.beacon_chain = Some(arc_chain); + self.beacon_chain = Some(Arc::new(chain)); self.beacon_chain_builder = None; // a beacon chain requires a timer diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index b9db262dd2..7d80649ce8 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -1,7 +1,7 @@ use crate::metrics; use beacon_chain::blob_verification::{AsBlock, BlockWrapper, IntoAvailableBlock}; use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now}; -use beacon_chain::NotifyExecutionLayer; +use beacon_chain::{AvailabilityProcessingStatus, NotifyExecutionLayer}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized}; use eth2::types::SignedBlockContents; use lighthouse_network::PubsubMessage; @@ -89,7 +89,7 @@ pub async fn publish_block( ) .await { - Ok(root) => { + Ok(AvailabilityProcessingStatus::Imported(root)) => { info!( log, "Valid block from HTTP API"; @@ -140,6 +140,24 @@ pub async fn publish_block( Ok(()) } + Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) => { + let msg = format!("Missing block with root {:?}", block_root); + error!( + log, + "Invalid block provided to HTTP API"; + "reason" => &msg + ); + Err(warp_utils::reject::broadcast_without_import(msg)) + } + Ok(AvailabilityProcessingStatus::PendingBlobs(blob_ids)) => { + let msg = format!("Missing blobs {:?}", blob_ids); + error!( + log, + "Invalid block provided to HTTP API"; + "reason" => &msg + ); + Err(warp_utils::reject::broadcast_without_import(msg)) + } Err(BlockError::BlockIsAlreadyKnown) => { info!( log, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 3d56f53b02..a362efcf3f 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1,6 +1,6 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; +use beacon_chain::blob_verification::{AsBlock, BlockWrapper, GossipVerifiedBlob}; use beacon_chain::store::Error; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, @@ -9,8 +9,8 @@ use beacon_chain::{ observed_operations::ObservationOutcome, sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::get_block_delay_ms, - BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized, ForkChoiceError, - GossipVerifiedBlock, NotifyExecutionLayer, + AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized, + ForkChoiceError, GossipVerifiedBlock, NotifyExecutionLayer, }; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use operation_pool::ReceivedPreCapella; @@ -802,8 +802,7 @@ impl Worker { verified_block } - Err(BlockError::AvailabilityPending(_)) => { - //TODO(sean) think about what to do hereA + Err(BlockError::AvailabilityCheck(e)) => { todo!() } Err(BlockError::ParentUnknown(block)) => { @@ -965,6 +964,29 @@ impl Worker { } } + pub async fn process_gossip_verified_blob( + self, + peer_id: PeerId, + verified_blob: GossipVerifiedBlob, + reprocess_tx: mpsc::Sender>, + // This value is not used presently, but it might come in handy for debugging. + _seen_duration: Duration, + ) { + // TODO + match self + .chain + .process_blob(verified_blob.to_blob(), CountUnrealized::True) + .await + { + Ok(hash) => { + // block imported + } + Err(e) => { + // handle errors + } + } + } + /// Process the beacon block that has already passed gossip verification. /// /// Raises a log if there are errors. @@ -989,7 +1011,7 @@ impl Worker { ) .await { - Ok(block_root) => { + Ok(AvailabilityProcessingStatus::Imported(block_root)) => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); if reprocess_tx @@ -1016,8 +1038,13 @@ impl Worker { self.chain.recompute_head_at_current_slot().await; } - Err(BlockError::AvailabilityPending(block_root)) => { + Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) => { + // make rpc request for block + todo!() + } + Ok(AvailabilityProcessingStatus::PendingBlobs(blob_ids)) => { // make rpc request for blob + todo!() } Err(BlockError::ParentUnknown(block)) => { // Inform the sync manager to find parents for this block diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 5bbcb4e1f3..08c8200bc8 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -8,7 +8,7 @@ use crate::metrics; use crate::sync::manager::{BlockProcessType, SyncMessage}; use crate::sync::{BatchProcessResult, ChainId}; use beacon_chain::blob_verification::{AsBlock, BlockWrapper, IntoAvailableBlock}; -use beacon_chain::CountUnrealized; +use beacon_chain::{AvailabilityProcessingStatus, CountUnrealized}; use beacon_chain::{ BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, @@ -100,7 +100,8 @@ impl Worker { metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); // RPC block imported, regardless of process type - if let &Ok(hash) = &result { + //TODO(sean) handle pending availability variants + if let &Ok(AvailabilityProcessingStatus::Imported(hash)) = &result { info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash); // Trigger processing for work referencing this block. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 43921b585a..b9774ffa81 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -732,7 +732,7 @@ impl SyncManager { self.block_lookups.single_block_lookup_response( id, peer_id, - maybe_block.map(BlockWrapper::Block), + maybe_block.map(BlockWrapper::AvailabilityPending), seen_timestamp, &mut self.network, ) @@ -747,7 +747,7 @@ impl SyncManager { BlockOrBlob::Block(maybe_block) => self.block_lookups.parent_lookup_response( id, peer_id, - maybe_block.map(BlockWrapper::Block), + maybe_block.map(BlockWrapper::AvailabilityPending), seen_timestamp, &mut self.network, ), diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 718ff7f722..c833c2d345 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -38,6 +38,7 @@ use std::marker::PhantomData; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; +use types::blob_sidecar::BlobSidecarArcList; use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; use types::*; @@ -66,7 +67,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// The hot database also contains all blocks. pub hot_db: Hot, /// LRU cache of deserialized blobs. Updated whenever a blob is loaded. - blob_cache: Mutex>>, + blob_cache: Mutex>>, /// LRU cache of deserialized blocks. Updated whenever a block is loaded. block_cache: Mutex>>, /// Chain spec. @@ -568,7 +569,11 @@ impl, Cold: ItemStore> HotColdDB blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes()) } - pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobSidecarList) -> Result<(), Error> { + pub fn put_blobs( + &self, + block_root: &Hash256, + blobs: BlobSidecarArcList, + ) -> Result<(), Error> { let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); blobs_db.put_bytes( DBColumn::BeaconBlob.into(), @@ -582,7 +587,7 @@ impl, Cold: ItemStore> HotColdDB pub fn blobs_as_kv_store_ops( &self, key: &Hash256, - blobs: &BlobSidecarList, + blobs: &BlobSidecarArcList, ops: &mut Vec, ) { let db_key = get_key_for_col(DBColumn::BeaconBlob.into(), key.as_bytes()); @@ -926,7 +931,7 @@ impl, Cold: ItemStore> HotColdDB let reverse_op = match op { StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root), StoreOp::DeleteBlobs(block_root) => match blobs_to_delete.pop() { - Some(blobs) => StoreOp::PutBlobs(*block_root, Arc::new(blobs)), + Some(blobs) => StoreOp::PutBlobs(*block_root, blobs), None => return Err(HotColdDBError::Rollback.into()), }, _ => return Err(HotColdDBError::Rollback.into()), @@ -972,7 +977,7 @@ impl, Cold: ItemStore> HotColdDB for op in blob_cache_ops { match op { StoreOp::PutBlobs(block_root, blobs) => { - guard_blob.put(block_root, (*blobs).clone()); + guard_blob.put(block_root, blobs.clone()); } StoreOp::DeleteBlobs(block_root) => { @@ -1320,12 +1325,12 @@ impl, Cold: ItemStore> HotColdDB } /// Fetch a blobs sidecar from the store. - pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { + pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { Some(ref blobs_bytes) => { - let blobs = BlobSidecarList::from_ssz_bytes(blobs_bytes)?; + let blobs = BlobSidecarArcList::from_ssz_bytes(blobs_bytes)?; // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, // may want to attempt to use one again self.blob_cache.lock().put(*block_root, blobs.clone()); diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index ea382a934e..bcda5b97bf 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -43,6 +43,7 @@ pub use metrics::scrape_for_metrics; use parking_lot::MutexGuard; use std::sync::Arc; use strum::{EnumString, IntoStaticStr}; +use types::blob_sidecar::BlobSidecarArcList; pub use types::*; pub type ColumnIter<'a> = Box), Error>> + 'a>; @@ -159,7 +160,7 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), - PutBlobs(Hash256, Arc>), + PutBlobs(Hash256, BlobSidecarArcList), PutOrphanedBlobsKey(Hash256), PutStateSummary(Hash256, HotStateSummary), PutStateTemporaryFlag(Hash256), diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index 502e453aad..fbd0aebb12 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -7,6 +7,7 @@ use serde_derive::{Deserialize, Serialize}; use ssz::Encode; use ssz_derive::{Decode, Encode}; use ssz_types::VariableList; +use std::sync::Arc; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; @@ -48,6 +49,9 @@ pub struct BlobSidecar { } pub type BlobSidecarList = VariableList, ::MaxBlobsPerBlock>; +//TODO(sean) is there any other way around this? need it arc blobs for caching in multiple places +pub type BlobSidecarArcList = + VariableList>, ::MaxBlobsPerBlock>; pub type Blobs = VariableList, ::MaxExtraDataBytes>; impl SignedRoot for BlobSidecar {} diff --git a/consensus/types/src/signed_blob.rs b/consensus/types/src/signed_blob.rs index f9ae481247..6b2279ce89 100644 --- a/consensus/types/src/signed_blob.rs +++ b/consensus/types/src/signed_blob.rs @@ -1,9 +1,14 @@ -use crate::{test_utils::TestRandom, BlobSidecar, EthSpec, Signature}; +use crate::{ + test_utils::TestRandom, BlobSidecar, ChainSpec, Domain, EthSpec, Fork, Hash256, Signature, + SignedRoot, SigningData, +}; +use bls::PublicKey; use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; use ssz_types::VariableList; use test_random_derive::TestRandom; +use tree_hash::TreeHash; use tree_hash_derive::TreeHash; #[derive( @@ -29,3 +34,37 @@ pub struct SignedBlobSidecar { pub type SignedBlobSidecarList = VariableList, ::MaxBlobsPerBlock>; + +impl SignedBlobSidecar { + /// Verify `self.signature`. + /// + /// If the root of `block.message` is already known it can be passed in via `object_root_opt`. + /// Otherwise, it will be computed locally. + pub fn verify_signature( + &self, + object_root_opt: Option, + pubkey: &PublicKey, + fork: &Fork, + genesis_validators_root: Hash256, + spec: &ChainSpec, + ) -> bool { + let domain = spec.get_domain( + self.message.slot.epoch(T::slots_per_epoch()), + Domain::BlobSidecar, + fork, + genesis_validators_root, + ); + + let message = if let Some(object_root) = object_root_opt { + SigningData { + object_root, + domain, + } + .tree_hash_root() + } else { + self.message.signing_root(domain) + }; + + self.signature.verify(pubkey, message) + } +}