diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 584774d72a..d403cbc597 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7,12 +7,12 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::blob_cache::BlobCache; -use crate::blob_verification::{AsBlock, AvailabilityPendingBlock, AvailableBlock, BlobError, BlockWrapper, IntoAvailableBlock}; +use crate::blob_verification::{AsBlock, AvailabilityPendingBlock, AvailableBlock, BlobError, BlockWrapper, IntoAvailableBlock, Blobs}; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::{ check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy, get_block_root, signature_verify_chain_segment, BlockError, ExecutionPendingBlock, GossipVerifiedBlock, - IntoExecutionPendingBlock, PayloadVerificationOutcome, POS_PANDA_BANNER, + IntoExecutionPendingBlock, PayloadVerificationOutcome, POS_PANDA_BANNER, ExecutedBlock, }; pub use crate::canonical_head::{CanonicalHead, CanonicalHeadRwLock}; use crate::chain_config::ChainConfig; @@ -273,6 +273,11 @@ pub enum StateSkipConfig { WithoutStateRoots, } +pub enum BlockProcessingResult { + Verified(Hash256), + AvailabilityPending(ExecutedBlock), +} + pub trait BeaconChainTypes: Send + Sync + 'static { type HotStore: store::ItemStore; type ColdStore: store::ItemStore; @@ -2689,13 +2694,13 @@ impl BeaconChain { /// /// Returns an `Err` if the given block was invalid, or an error was encountered during /// verification. - pub async fn process_block, B: IntoExecutionPendingBlock>( + pub async fn process_block>( self: &Arc, block_root: Hash256, unverified_block: B, count_unrealized: CountUnrealized, notify_execution_layer: NotifyExecutionLayer, - ) -> Result> { + ) -> Result, BlockError> { // Start the Prometheus timer. let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); @@ -2704,17 +2709,26 @@ impl BeaconChain { let slot = unverified_block.block().slot(); - // A small closure to group the verification and import errors. + + let execution_pending = unverified_block.into_execution_pending_block( + block_root, + &chain, + notify_execution_layer, + )?; + + // TODO(log required errors) + let executed_block = self + .into_executed_block(execution_pending, count_unrealized) + .await?; + let chain = self.clone(); - let import_block = async move { - let execution_pending = unverified_block.into_execution_pending_block( - block_root, - &chain, - notify_execution_layer, - )?; - chain - .import_execution_pending_block(execution_pending, count_unrealized) - .await + + // Check if the executed block has all it's blobs available to qualify as a fully + // available block + let import_block = if let Ok(blobs) = self.gossip_blob_cache.lock().blobs(executed_block.block_root) { + self.import_available_block(executed_block, blobs, count_unrealized) + } else { + return Ok(BlockProcessingResult::AvailabilityPending(executed_block)); }; // Verify and import the block. @@ -2731,7 +2745,7 @@ impl BeaconChain { // Increment the Prometheus counter for block processing successes. metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); - Ok(block_root) + Ok(BlockProcessingResult::Verified(block_root)) } Err(e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_))) => { debug!( @@ -2763,16 +2777,15 @@ impl BeaconChain { } } - /// Accepts a fully-verified block and imports it into the chain without performing any - /// additional verification. + /// Accepts a fully-verified block and awaits on it's payload verification handle to + /// get a fully `ExecutedBlock` /// - /// An error is returned if the block was unable to be imported. It may be partially imported - /// (i.e., this function is not atomic). - async fn import_execution_pending_block>( + /// An error is returned if the verification handle couldn't be awaited. + async fn into_executed_block( self: Arc, - execution_pending_block: ExecutionPendingBlock, + execution_pending_block: ExecutionPendingBlock, count_unrealized: CountUnrealized, - ) -> Result> { + ) -> Result, BlockError> { let ExecutionPendingBlock { block, block_root, @@ -2784,16 +2797,13 @@ impl BeaconChain { consensus_context, } = execution_pending_block; - let PayloadVerificationOutcome { - payload_verification_status, - is_valid_merge_transition_block, - } = payload_verification_handle + let payload_verification_outcome = payload_verification_handle .await .map_err(BeaconChainError::TokioJoin)? .ok_or(BeaconChainError::RuntimeShutdown)??; // Log the PoS pandas if a merge transition just occurred. - if is_valid_merge_transition_block { + if payload_verification_outcome.is_valid_merge_transition_block { info!(self.log, "{}", POS_PANDA_BANNER); info!( self.log, @@ -2821,10 +2831,48 @@ impl BeaconChain { .into_root() ); } + Ok(ExecutedBlock { + block, + block_root, + state, + parent_block, + confirmed_state_roots, + parent_eth1_finalization_data, + consensus_context, + payload_verification_outcome + }) + } + + /// Accepts a fully-verified, available block and imports it into the chain without performing any + /// additional verification. + /// + /// An error is returned if the block was unable to be imported. It may be partially imported + /// (i.e., this function is not atomic). + async fn import_available_block( + self: Arc, + executed_block: ExecutedBlock, + blobs: Blobs + count_unrealized: CountUnrealized, + ) -> Result> { + let ExecutedBlock { + block, + block_root, + state, + parent_block, + confirmed_state_roots, + payload_verification_outcome, + parent_eth1_finalization_data, + consensus_context, + } = execution_pending_block; - let available_block = block.into_available_block()?; let chain = self.clone(); + + let available_block = AvailableBlock { + block: block, + blobs: blobs + }; + let block_hash = self .spawn_blocking_handle( move || { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 595acf1b4d..1e420f8a4b 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -658,11 +658,8 @@ type PayloadVerificationHandle = /// Note: a `ExecutionPendingBlock` is not _forever_ valid to be imported, it may later become invalid /// due to finality or some other event. A `ExecutionPendingBlock` should be imported into the /// `BeaconChain` immediately after it is instantiated. -pub struct ExecutionPendingBlock< - T: BeaconChainTypes, - B: IntoAvailablBlockk = AvailableBlock, -> { - pub block: B, +pub struct ExecutionPendingBlock { + pub block: Arc>, pub block_root: Hash256, pub state: BeaconState, pub parent_block: SignedBeaconBlock>, @@ -672,14 +669,21 @@ pub struct ExecutionPendingBlock< pub payload_verification_handle: PayloadVerificationHandle, } +pub struct ExecutedBlock { + pub block: Arc>, + pub block_root: Hash256, + pub state: BeaconState, + pub parent_block: SignedBeaconBlock>, + pub parent_eth1_finalization_data: Eth1FinalizationData, + pub confirmed_state_roots: Vec, + pub consensus_context: ConsensusContext, + pub payload_verification_outcome: PayloadVerificationOutcome, +} + /// Implemented on types that can be converted into a `ExecutionPendingBlock`. /// /// Used to allow functions to accept blocks at various stages of verification. -pub trait IntoExecutionPendingBlock< - T: BeaconChainTypes, - B: IntoAvailableBlock = AvailableBlock, ->: Sized -{ +pub trait IntoExecutionPendingBlock: Sized { fn into_execution_pending_block( self, block_root: Hash256, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 15f9c5e29e..4d7c949dc0 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -985,7 +985,7 @@ impl Worker { ) .await { - Ok(block_root) => { + Ok(BlockProcessingResult::Verified(block_root)) => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); if reprocess_tx @@ -1012,6 +1012,9 @@ impl Worker { self.chain.recompute_head_at_current_slot().await; } + Ok(BlockProcessingResult::AvailabilityPending(executed_block)) => { + // cache in blob cache and make rpc request for blob + } Err(BlockError::ParentUnknown(block)) => { // Inform the sync manager to find parents for this block // This should not occur. It should be checked by `should_forward_block` diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 8d5bd53aea..093e69729b 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -91,7 +91,7 @@ impl Worker { .map_err(BlockError::BlobValidation); let result = match available_block { - Ok(block) => { + Ok(BlockProcessingResult::Verified(block)) => { self.chain .process_block( block_root, @@ -101,6 +101,10 @@ impl Worker { ) .await } + Ok(BlockProcessingResult::AvailabilityPending(executed_block)) => { + // Shouldn't happen as sync should only send blocks for processing + // after sending blocks into the availability cache. + } Err(e) => Err(e), };