everything, everywhere, all at once

This commit is contained in:
realbigsean
2023-03-17 16:12:40 -04:00
parent 98babb2e67
commit 05db0d2ba3
17 changed files with 453 additions and 253 deletions

View File

@@ -25,7 +25,7 @@ use crate::eth1_finalization_cache::{Eth1FinalizationCache, Eth1FinalizationData
use crate::events::ServerSentEventHandler;
use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle};
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
use crate::gossip_blob_cache::DataAvailabilityChecker;
use crate::gossip_blob_cache::{Availability, AvailabilityCheckError, DataAvailabilityChecker};
use crate::head_tracker::HeadTracker;
use crate::historical_blocks::HistoricalBlockError;
use crate::kzg_utils;
@@ -116,7 +116,7 @@ use tokio::task::JoinHandle;
use tree_hash::TreeHash;
use types::beacon_block_body::KzgCommitments;
use types::beacon_state::CloneConfig;
use types::blob_sidecar::Blobs;
use types::blob_sidecar::{BlobIdentifier, BlobSidecarArcList, Blobs};
use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
use types::consts::merge::INTERVALS_PER_SLOT;
use types::*;
@@ -189,6 +189,25 @@ pub enum WhenSlotSkipped {
Prev,
}
#[derive(Debug)]
pub enum AvailabilityProcessingStatus {
PendingBlobs(Vec<BlobIdentifier>),
PendingBlock(Hash256),
Imported(Hash256),
}
//TODO(sean) using this in tests for now
impl TryInto<SignedBeaconBlockHash> for AvailabilityProcessingStatus {
type Error = ();
fn try_into(self) -> Result<SignedBeaconBlockHash, Self::Error> {
match self {
AvailabilityProcessingStatus::Imported(hash) => Ok(hash.into()),
_ => Err(()),
}
}
}
/// The result of a chain segment processing.
pub enum ChainSegmentResult<T: EthSpec> {
/// Processing this chain segment finished successfully.
@@ -445,7 +464,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// Provides monitoring of a set of explicitly defined validators.
pub validator_monitor: RwLock<ValidatorMonitor<T::EthSpec>>,
pub proposal_blob_cache: BlobCache<T::EthSpec>,
pub data_availability_checker: Option<DataAvailabilityChecker<T::EthSpec>>,
pub data_availability_checker: DataAvailabilityChecker<T::EthSpec>,
pub kzg: Option<Arc<Kzg>>,
}
@@ -1061,7 +1080,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn get_blobs(
&self,
block_root: &Hash256,
) -> Result<Option<BlobSidecarList<T::EthSpec>>, Error> {
) -> Result<Option<BlobSidecarArcList<T::EthSpec>>, Error> {
Ok(self.store.get_blobs(block_root)?)
}
@@ -2635,6 +2654,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(BeaconChainError::TokioJoin)?
}
pub async fn process_blob(
self: &Arc<Self>,
blob: BlobSidecar<T::EthSpec>,
count_unrealized: CountUnrealized,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
self.check_availability_and_maybe_import(
|chain| chain.data_availability_checker.put_blob(Arc::new(blob)),
count_unrealized,
)
.await
}
/// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and
/// imported into the chain.
///
@@ -2653,7 +2684,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
unverified_block: B,
count_unrealized: CountUnrealized,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<Hash256, BlockError<T::EthSpec>> {
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
@@ -2673,65 +2704,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let executed_block = self
.clone()
.into_executed_block(execution_pending, count_unrealized)
.await?;
.await
.map_err(|e| self.handle_block_error(e))?;
// Check if the executed block has all it's blobs available to qualify as a fully
// available block
let import_block = if let Some(da_checker) = self.data_availability_checker.as_ref() {
da_checker.put_block(executed_block); //TODO(sean) errors
return Err(BlockError::AvailabilityPending(block_root));
} else {
self.clone().import_available_block(
executed_block,
VerifiedBlobs::PreEip4844,
count_unrealized,
)
};
// Verify and import the block.
match import_block.await {
// The block was successfully verified and imported. Yay.
Ok(block_root) => {
trace!(
self.log,
"Beacon block imported";
"block_root" => ?block_root,
"block_slot" => slot,
);
// Increment the Prometheus counter for block processing successes.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
Ok(block_root)
}
Err(e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_))) => {
debug!(
self.log,
"Beacon block processing cancelled";
"error" => ?e,
);
Err(e)
}
// There was an error whilst attempting to verify and import the block. The block might
// be partially verified or partially imported.
Err(BlockError::BeaconChainError(e)) => {
crit!(
self.log,
"Beacon block processing error";
"error" => ?e,
);
Err(BlockError::BeaconChainError(e))
}
// The block failed verification.
Err(other) => {
trace!(
self.log,
"Beacon block rejected";
"reason" => other.to_string(),
);
Err(other)
}
}
self.check_availability_and_maybe_import(
|chain| {
chain
.data_availability_checker
.check_block_availability(executed_block)
},
count_unrealized,
)
.await
}
/// Accepts a fully-verified block and awaits on it's payload verification handle to
@@ -2800,55 +2784,118 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}
fn handle_block_error(&self, e: BlockError<T::EthSpec>) -> BlockError<T::EthSpec> {
match e {
e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_)) => {
debug!(
self.log,
"Beacon block processing cancelled";
"error" => ?e,
);
e
}
BlockError::BeaconChainError(e) => {
crit!(
self.log,
"Beacon block processing error";
"error" => ?e,
);
BlockError::BeaconChainError(e)
}
other => {
trace!(
self.log,
"Beacon block rejected";
"reason" => other.to_string(),
);
other
}
}
}
/// Accepts a fully-verified, available block and imports it into the chain without performing any
/// additional verification.
///
/// An error is returned if the block was unable to be imported. It may be partially imported
/// (i.e., this function is not atomic).
async fn import_available_block(
self: Arc<Self>,
executed_block: ExecutedBlock<T::EthSpec>,
blobs: VerifiedBlobs<T::EthSpec>,
async fn check_availability_and_maybe_import(
self: &Arc<Self>,
cache_fn: impl FnOnce(Arc<Self>) -> Result<Availability<T::EthSpec>, AvailabilityCheckError>,
count_unrealized: CountUnrealized,
) -> Result<Hash256, BlockError<T::EthSpec>> {
let ExecutedBlock {
block,
block_root,
state,
parent_block,
confirmed_state_roots,
payload_verification_outcome,
parent_eth1_finalization_data,
consensus_context,
} = executed_block;
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let availability = cache_fn(self.clone())?;
match availability {
Availability::Available(block) => {
let ExecutedBlock {
block,
block_root,
state,
parent_block,
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
payload_verification_outcome,
} = block;
let chain = self.clone();
let available_block = match block {
BlockWrapper::Available(block) => block,
BlockWrapper::AvailabilityPending(_) => {
todo!() // logic error
}
};
let available_block = AvailableBlock {
block: block.block_cloned(),
blobs: blobs,
};
let slot = available_block.block.slot();
let block_hash = self
.spawn_blocking_handle(
move || {
chain.import_block(
available_block,
block_root,
state,
confirmed_state_roots,
payload_verification_outcome.payload_verification_status,
count_unrealized,
parent_block,
parent_eth1_finalization_data,
consensus_context,
// import
let chain = self.clone();
let result = self
.spawn_blocking_handle(
move || {
chain.import_block(
available_block,
block_root,
state,
confirmed_state_roots,
payload_verification_outcome.payload_verification_status,
count_unrealized,
parent_block,
parent_eth1_finalization_data,
consensus_context,
)
},
"payload_verification_handle",
)
},
"payload_verification_handle",
)
.await??;
.await
.map_err(|e| {
let b = BlockError::from(e);
self.handle_block_error(b)
})?;
Ok(block_hash)
match result {
// The block was successfully verified and imported. Yay.
Ok(block_root) => {
trace!(
self.log,
"Beacon block imported";
"block_root" => ?block_root,
"block_slot" => slot,
);
// Increment the Prometheus counter for block processing successes.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
Ok(AvailabilityProcessingStatus::Imported(block_root))
}
Err(e) => Err(self.handle_block_error(e)),
}
}
Availability::PendingBlock(block_root) => {
Ok(AvailabilityProcessingStatus::PendingBlock(block_root))
}
Availability::PendingBlobs(blob_ids) => {
Ok(AvailabilityProcessingStatus::PendingBlobs(blob_ids))
}
}
}
/// Accepts a fully-verified and available block and imports it into the chain without performing any
@@ -6154,52 +6201,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(|fork_epoch| fork_epoch <= current_epoch)
.unwrap_or(false))
}
pub fn start_block_importer(
self: &Arc<Self>,
mut rx: tokio::sync::mpsc::Receiver<ExecutedBlock<T::EthSpec>>,
) {
let chain = self.clone();
self.task_executor.spawn(
async move {
while let Some(block) = rx.recv().await {
let ExecutedBlock {
block,
block_root,
state,
parent_block,
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
payload_verification_outcome,
} = block;
let available_block = block.into_available_block().unwrap(); //TODO(sean) remove unwrap
let chain_inner = chain.clone();
let block_hash = chain
.spawn_blocking_handle(
move || {
chain_inner.import_block(
available_block,
block_root,
state,
confirmed_state_roots,
payload_verification_outcome.payload_verification_status,
CountUnrealized::True, //TODO(sean)
parent_block,
parent_eth1_finalization_data,
consensus_context,
)
},
"block_importer",
)
.await;
}
},
"block_importer_listener",
);
}
}
impl<T: BeaconChainTypes> Drop for BeaconChain<T> {