Initial commit

This commit is contained in:
Pawan Dhananjay
2023-03-14 19:47:06 +05:30
parent 13b54f7879
commit 761df83597
4 changed files with 99 additions and 40 deletions

View File

@@ -7,12 +7,12 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache; use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_cache::BlobCache; use crate::blob_cache::BlobCache;
use crate::blob_verification::{AsBlock, AvailabilityPendingBlock, AvailableBlock, BlobError, BlockWrapper, IntoAvailableBlock}; use crate::blob_verification::{AsBlock, AvailabilityPendingBlock, AvailableBlock, BlobError, BlockWrapper, IntoAvailableBlock, Blobs};
use crate::block_times_cache::BlockTimesCache; use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::{ use crate::block_verification::{
check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy, get_block_root, check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy, get_block_root,
signature_verify_chain_segment, BlockError, ExecutionPendingBlock, GossipVerifiedBlock, signature_verify_chain_segment, BlockError, ExecutionPendingBlock, GossipVerifiedBlock,
IntoExecutionPendingBlock, PayloadVerificationOutcome, POS_PANDA_BANNER, IntoExecutionPendingBlock, PayloadVerificationOutcome, POS_PANDA_BANNER, ExecutedBlock,
}; };
pub use crate::canonical_head::{CanonicalHead, CanonicalHeadRwLock}; pub use crate::canonical_head::{CanonicalHead, CanonicalHeadRwLock};
use crate::chain_config::ChainConfig; use crate::chain_config::ChainConfig;
@@ -273,6 +273,11 @@ pub enum StateSkipConfig {
WithoutStateRoots, WithoutStateRoots,
} }
pub enum BlockProcessingResult<T: BeaconChainTypes> {
Verified(Hash256),
AvailabilityPending(ExecutedBlock<T>),
}
pub trait BeaconChainTypes: Send + Sync + 'static { pub trait BeaconChainTypes: Send + Sync + 'static {
type HotStore: store::ItemStore<Self::EthSpec>; type HotStore: store::ItemStore<Self::EthSpec>;
type ColdStore: store::ItemStore<Self::EthSpec>; type ColdStore: store::ItemStore<Self::EthSpec>;
@@ -2689,13 +2694,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// ///
/// Returns an `Err` if the given block was invalid, or an error was encountered during /// Returns an `Err` if the given block was invalid, or an error was encountered during
/// verification. /// verification.
pub async fn process_block<A: IntoAvailableBlock<T>, B: IntoExecutionPendingBlock<T, A>>( pub async fn process_block<B: IntoExecutionPendingBlock<T>>(
self: &Arc<Self>, self: &Arc<Self>,
block_root: Hash256, block_root: Hash256,
unverified_block: B, unverified_block: B,
count_unrealized: CountUnrealized, count_unrealized: CountUnrealized,
notify_execution_layer: NotifyExecutionLayer, notify_execution_layer: NotifyExecutionLayer,
) -> Result<Hash256, BlockError<T::EthSpec>> { ) -> Result<BlockProcessingResult<T>, BlockError<T::EthSpec>> {
// Start the Prometheus timer. // Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
@@ -2704,17 +2709,26 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let slot = unverified_block.block().slot(); let slot = unverified_block.block().slot();
// A small closure to group the verification and import errors.
let chain = self.clone();
let import_block = async move {
let execution_pending = unverified_block.into_execution_pending_block( let execution_pending = unverified_block.into_execution_pending_block(
block_root, block_root,
&chain, &chain,
notify_execution_layer, notify_execution_layer,
)?; )?;
chain
.import_execution_pending_block(execution_pending, count_unrealized) // TODO(log required errors)
.await let executed_block = self
.into_executed_block(execution_pending, count_unrealized)
.await?;
let chain = self.clone();
// Check if the executed block has all it's blobs available to qualify as a fully
// available block
let import_block = if let Ok(blobs) = self.gossip_blob_cache.lock().blobs(executed_block.block_root) {
self.import_available_block(executed_block, blobs, count_unrealized)
} else {
return Ok(BlockProcessingResult::AvailabilityPending(executed_block));
}; };
// Verify and import the block. // Verify and import the block.
@@ -2731,7 +2745,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Increment the Prometheus counter for block processing successes. // Increment the Prometheus counter for block processing successes.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
Ok(block_root) Ok(BlockProcessingResult::Verified(block_root))
} }
Err(e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_))) => { Err(e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_))) => {
debug!( debug!(
@@ -2763,16 +2777,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} }
} }
/// Accepts a fully-verified block and imports it into the chain without performing any /// Accepts a fully-verified block and awaits on it's payload verification handle to
/// additional verification. /// get a fully `ExecutedBlock`
/// ///
/// An error is returned if the block was unable to be imported. It may be partially imported /// An error is returned if the verification handle couldn't be awaited.
/// (i.e., this function is not atomic). async fn into_executed_block(
async fn import_execution_pending_block<B: IntoAvailableBlock<T>>(
self: Arc<Self>, self: Arc<Self>,
execution_pending_block: ExecutionPendingBlock<T, B>, execution_pending_block: ExecutionPendingBlock<T>,
count_unrealized: CountUnrealized, count_unrealized: CountUnrealized,
) -> Result<Hash256, BlockError<T::EthSpec>> { ) -> Result<ExecutedBlock<T>, BlockError<T::EthSpec>> {
let ExecutionPendingBlock { let ExecutionPendingBlock {
block, block,
block_root, block_root,
@@ -2784,16 +2797,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
consensus_context, consensus_context,
} = execution_pending_block; } = execution_pending_block;
let PayloadVerificationOutcome { let payload_verification_outcome = payload_verification_handle
payload_verification_status,
is_valid_merge_transition_block,
} = payload_verification_handle
.await .await
.map_err(BeaconChainError::TokioJoin)? .map_err(BeaconChainError::TokioJoin)?
.ok_or(BeaconChainError::RuntimeShutdown)??; .ok_or(BeaconChainError::RuntimeShutdown)??;
// Log the PoS pandas if a merge transition just occurred. // Log the PoS pandas if a merge transition just occurred.
if is_valid_merge_transition_block { if payload_verification_outcome.is_valid_merge_transition_block {
info!(self.log, "{}", POS_PANDA_BANNER); info!(self.log, "{}", POS_PANDA_BANNER);
info!( info!(
self.log, self.log,
@@ -2821,10 +2831,48 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.into_root() .into_root()
); );
} }
Ok(ExecutedBlock {
block,
block_root,
state,
parent_block,
confirmed_state_roots,
parent_eth1_finalization_data,
consensus_context,
payload_verification_outcome
})
}
/// Accepts a fully-verified, available block and imports it into the chain without performing any
/// additional verification.
///
/// An error is returned if the block was unable to be imported. It may be partially imported
/// (i.e., this function is not atomic).
async fn import_available_block(
self: Arc<Self>,
executed_block: ExecutedBlock<T>,
blobs: Blobs<T::EthSpec>
count_unrealized: CountUnrealized,
) -> Result<Hash256, BlockError<T::EthSpec>> {
let ExecutedBlock {
block,
block_root,
state,
parent_block,
confirmed_state_roots,
payload_verification_outcome,
parent_eth1_finalization_data,
consensus_context,
} = execution_pending_block;
let available_block = block.into_available_block()?;
let chain = self.clone(); let chain = self.clone();
let available_block = AvailableBlock {
block: block,
blobs: blobs
};
let block_hash = self let block_hash = self
.spawn_blocking_handle( .spawn_blocking_handle(
move || { move || {

View File

@@ -658,11 +658,8 @@ type PayloadVerificationHandle<E> =
/// Note: a `ExecutionPendingBlock` is not _forever_ valid to be imported, it may later become invalid /// Note: a `ExecutionPendingBlock` is not _forever_ valid to be imported, it may later become invalid
/// due to finality or some other event. A `ExecutionPendingBlock` should be imported into the /// due to finality or some other event. A `ExecutionPendingBlock` should be imported into the
/// `BeaconChain` immediately after it is instantiated. /// `BeaconChain` immediately after it is instantiated.
pub struct ExecutionPendingBlock< pub struct ExecutionPendingBlock<T: BeaconChainTypes> {
T: BeaconChainTypes, pub block: Arc<SignedBeaconBlock<T::EthSpec>>,
B: IntoAvailablBlockk = AvailableBlock<T::EthSpec>,
> {
pub block: B,
pub block_root: Hash256, pub block_root: Hash256,
pub state: BeaconState<T::EthSpec>, pub state: BeaconState<T::EthSpec>,
pub parent_block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>, pub parent_block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>,
@@ -672,14 +669,21 @@ pub struct ExecutionPendingBlock<
pub payload_verification_handle: PayloadVerificationHandle<T::EthSpec>, pub payload_verification_handle: PayloadVerificationHandle<T::EthSpec>,
} }
pub struct ExecutedBlock<T: BeaconChainTypes> {
pub block: Arc<SignedBeaconBlock<T::EthSpec>>,
pub block_root: Hash256,
pub state: BeaconState<T::EthSpec>,
pub parent_block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>,
pub parent_eth1_finalization_data: Eth1FinalizationData,
pub confirmed_state_roots: Vec<Hash256>,
pub consensus_context: ConsensusContext<T::EthSpec>,
pub payload_verification_outcome: PayloadVerificationOutcome,
}
/// Implemented on types that can be converted into a `ExecutionPendingBlock`. /// Implemented on types that can be converted into a `ExecutionPendingBlock`.
/// ///
/// Used to allow functions to accept blocks at various stages of verification. /// Used to allow functions to accept blocks at various stages of verification.
pub trait IntoExecutionPendingBlock< pub trait IntoExecutionPendingBlock<T: BeaconChainTypes>: Sized {
T: BeaconChainTypes,
B: IntoAvailableBlock<T> = AvailableBlock<T::EthSpec>,
>: Sized
{
fn into_execution_pending_block( fn into_execution_pending_block(
self, self,
block_root: Hash256, block_root: Hash256,

View File

@@ -985,7 +985,7 @@ impl<T: BeaconChainTypes> Worker<T> {
) )
.await .await
{ {
Ok(block_root) => { Ok(BlockProcessingResult::Verified(block_root)) => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
if reprocess_tx if reprocess_tx
@@ -1012,6 +1012,9 @@ impl<T: BeaconChainTypes> Worker<T> {
self.chain.recompute_head_at_current_slot().await; self.chain.recompute_head_at_current_slot().await;
} }
Ok(BlockProcessingResult::AvailabilityPending(executed_block)) => {
// cache in blob cache and make rpc request for blob
}
Err(BlockError::ParentUnknown(block)) => { Err(BlockError::ParentUnknown(block)) => {
// Inform the sync manager to find parents for this block // Inform the sync manager to find parents for this block
// This should not occur. It should be checked by `should_forward_block` // This should not occur. It should be checked by `should_forward_block`

View File

@@ -91,7 +91,7 @@ impl<T: BeaconChainTypes> Worker<T> {
.map_err(BlockError::BlobValidation); .map_err(BlockError::BlobValidation);
let result = match available_block { let result = match available_block {
Ok(block) => { Ok(BlockProcessingResult::Verified(block)) => {
self.chain self.chain
.process_block( .process_block(
block_root, block_root,
@@ -101,6 +101,10 @@ impl<T: BeaconChainTypes> Worker<T> {
) )
.await .await
} }
Ok(BlockProcessingResult::AvailabilityPending(executed_block)) => {
// Shouldn't happen as sync should only send blocks for processing
// after sending blocks into the availability cache.
}
Err(e) => Err(e), Err(e) => Err(e),
}; };