Lenient duplicate checks on HTTP API for block publication (#5574)

* start splitting gossip verification

* WIP

* Gossip verify separate (#7)

* save

* save

* make ProvenancedBlock concrete

* delete into gossip verified block contents

* get rid of IntoBlobSidecar trait

* remove IntoGossipVerified trait

* get tests compiling

* don't check sidecar slashability in publish

* remove second publish closure

* drop blob bool. also prefer using message index over index of position in list

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Fix low-hanging tests

* Fix tests and clean up

* Clean up imports

* more cleanup

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Further refine behaviour and add tests

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Remove empty line

* Fix test (block is not fully imported just gossip verified)

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Update for unstable & use empty blob list

* Update comment

* Add test for duplicate block case

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Clarify unreachable case

* Fix another publish_block case

* Remove unreachable case in filter chain segment

* Revert unrelated blob optimisation

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Fix merge conflicts

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Fix some compilation issues. Impl is fucked though

* Support peerDAS

* Fix tests

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Fix conflict

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Address review comments

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate
This commit is contained in:
Michael Sproul
2024-09-24 14:52:44 +10:00
committed by GitHub
parent 1447eeb40b
commit 2792705331
21 changed files with 1071 additions and 516 deletions

View File

@@ -2740,7 +2740,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the block is relevant, add it to the filtered chain segment.
Ok(_) => filtered_chain_segment.push((block_root, block)),
// If the block is already known, simply ignore this block.
Err(BlockError::BlockIsAlreadyKnown(_)) => continue,
//
// Note that `check_block_relevancy` is incapable of returning
// `DuplicateImportStatusUnknown` so we don't need to handle that case here.
Err(BlockError::DuplicateFullyImported(_)) => continue,
// If the block is the genesis block, simply ignore this block.
Err(BlockError::GenesisBlock) => continue,
// If the block is is for a finalized slot, simply ignore this block.
@@ -2886,7 +2889,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
}
Err(BlockError::BlockIsAlreadyKnown(block_root)) => {
Err(BlockError::DuplicateFullyImported(block_root)) => {
debug!(self.log,
"Ignoring already known blocks while processing chain segment";
"block_root" => ?block_root);
@@ -2977,6 +2980,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_gossip_blob(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let block_root = blob.block_root();
@@ -2987,7 +2991,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(blob.block_root()));
return Err(BlockError::DuplicateFullyImported(blob.block_root()));
}
// No need to process and import blobs beyond the PeerDAS epoch.
@@ -3003,7 +3007,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
let r = self.check_gossip_blob_availability_and_import(blob).await;
let r = self
.check_gossip_blob_availability_and_import(blob, publish_fn)
.await;
self.remove_notified(&block_root, r)
}
@@ -3012,6 +3018,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_gossip_data_columns(
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<
(
AvailabilityProcessingStatus,
@@ -3037,11 +3044,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
return Err(BlockError::DuplicateFullyImported(block_root));
}
let r = self
.check_gossip_data_columns_availability_and_import(slot, block_root, data_columns)
.check_gossip_data_columns_availability_and_import(
slot,
block_root,
data_columns,
publish_fn,
)
.await;
self.remove_notified_custody_columns(&block_root, r)
}
@@ -3061,7 +3073,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
return Err(BlockError::DuplicateFullyImported(block_root));
}
// Reject RPC blobs referencing unknown parents. Otherwise we allow potentially invalid data
@@ -3127,7 +3139,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
return Err(BlockError::DuplicateFullyImported(block_root));
}
// Reject RPC columns referencing unknown parents. Otherwise we allow potentially invalid data
@@ -3225,7 +3237,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
unverified_block: B,
notify_execution_layer: NotifyExecutionLayer,
block_source: BlockImportSource,
publish_fn: impl FnOnce() -> Result<(), BlockError> + Send + 'static,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
@@ -3407,7 +3419,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let availability = self
.data_availability_checker
.put_pending_executed_block(block)?;
self.process_availability(slot, availability).await
self.process_availability(slot, availability, || Ok(()))
.await
}
/// Checks if the provided blob can make any cached blocks available, and imports immediately
@@ -3415,6 +3428,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
async fn check_gossip_blob_availability_and_import(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let slot = blob.slot();
if let Some(slasher) = self.slasher.as_ref() {
@@ -3422,7 +3436,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
let availability = self.data_availability_checker.put_gossip_blob(blob)?;
self.process_availability(slot, availability).await
self.process_availability(slot, availability, publish_fn)
.await
}
/// Checks if the provided data column can make any cached blocks available, and imports immediately
@@ -3432,6 +3447,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<
(
AvailabilityProcessingStatus,
@@ -3449,7 +3465,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_gossip_data_columns(slot, block_root, data_columns)?;
self.process_availability(slot, availability)
self.process_availability(slot, availability, publish_fn)
.await
.map(|result| (result, data_columns_to_publish))
}
@@ -3490,7 +3506,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_rpc_blobs(block_root, epoch, blobs)?;
self.process_availability(slot, availability).await
self.process_availability(slot, availability, || Ok(()))
.await
}
/// Checks if the provided columns can make any cached blocks available, and imports immediately
@@ -3538,7 +3555,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
custody_columns,
)?;
self.process_availability(slot, availability)
self.process_availability(slot, availability, || Ok(()))
.await
.map(|result| (result, data_columns_to_publish))
}
@@ -3551,9 +3568,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
slot: Slot,
availability: Availability<T::EthSpec>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
match availability {
Availability::Available(block) => {
publish_fn()?;
// Block is fully available, import into fork choice
self.import_available_block(block).await
}

View File

@@ -49,14 +49,10 @@
#![allow(clippy::result_large_err)]
use crate::beacon_snapshot::PreProcessingSnapshot;
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob, GossipVerifiedBlobList};
use crate::block_verification_types::{
AsBlock, BlockContentsError, BlockImportData, GossipVerifiedBlockContents, RpcBlock,
};
use crate::blob_verification::GossipBlobError;
use crate::block_verification_types::{AsBlock, BlockImportData, RpcBlock};
use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock};
use crate::data_column_verification::{
GossipDataColumnError, GossipVerifiedDataColumn, GossipVerifiedDataColumnList,
};
use crate::data_column_verification::GossipDataColumnError;
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::execution_payload::{
is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block,
@@ -71,7 +67,7 @@ use crate::{
metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use derivative::Derivative;
use eth2::types::{BlockGossip, EventKind, PublishBlockRequest};
use eth2::types::{BlockGossip, EventKind};
use execution_layer::PayloadStatus;
pub use fork_choice::{AttestationFromBlock, PayloadVerificationStatus};
use lighthouse_metrics::TryExt;
@@ -82,7 +78,6 @@ use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
use state_processing::per_block_processing::{errors::IntoWithIndex, is_merge_transition_block};
use state_processing::{
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
@@ -98,14 +93,12 @@ use std::io::Write;
use std::sync::Arc;
use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
use task_executor::JoinHandle;
use types::data_column_sidecar::DataColumnSidecarError;
use types::{
BeaconBlockRef, BeaconState, BeaconStateError, BlobsList, ChainSpec, DataColumnSubnetId, Epoch,
EthSpec, ExecutionBlockHash, FullPayload, Hash256, InconsistentFork, KzgProofs, PublicKey,
PublicKeyBytes, RelativeEpoch, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader,
Slot,
data_column_sidecar::DataColumnSidecarError, BeaconBlockRef, BeaconState, BeaconStateError,
BlobsList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, ExecPayload, ExecutionBlockHash,
FullPayload, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch,
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};
use types::{BlobSidecar, ExecPayload};
pub const POS_PANDA_BANNER: &str = r#"
,,, ,,, ,,, ,,,
@@ -187,12 +180,18 @@ pub enum BlockError {
/// It's unclear if this block is valid, but it conflicts with finality and shouldn't be
/// imported.
NotFinalizedDescendant { block_parent_root: Hash256 },
/// Block is already known, no need to re-import.
/// Block is already known and valid, no need to re-import.
///
/// ## Peer scoring
///
/// The block is valid and we have already imported a block with this hash.
BlockIsAlreadyKnown(Hash256),
DuplicateFullyImported(Hash256),
/// Block has already been seen on gossip but has not necessarily finished being imported.
///
/// ## Peer scoring
///
/// The block could be valid, or invalid. We don't know.
DuplicateImportStatusUnknown(Hash256),
/// The block slot exceeds the MAXIMUM_BLOCK_SLOT_NUMBER.
///
/// ## Peer scoring
@@ -704,91 +703,49 @@ pub struct ExecutionPendingBlock<T: BeaconChainTypes> {
pub payload_verification_handle: PayloadVerificationHandle,
}
pub trait IntoGossipVerifiedBlockContents<T: BeaconChainTypes>: Sized {
pub trait IntoGossipVerifiedBlock<T: BeaconChainTypes>: Sized {
fn into_gossip_verified_block(
self,
chain: &BeaconChain<T>,
) -> Result<GossipVerifiedBlockContents<T>, BlockContentsError>;
fn inner_block(&self) -> &SignedBeaconBlock<T::EthSpec>;
) -> Result<GossipVerifiedBlock<T>, BlockError>;
fn inner_block(&self) -> Arc<SignedBeaconBlock<T::EthSpec>>;
}
impl<T: BeaconChainTypes> IntoGossipVerifiedBlockContents<T> for GossipVerifiedBlockContents<T> {
impl<T: BeaconChainTypes> IntoGossipVerifiedBlock<T> for GossipVerifiedBlock<T> {
fn into_gossip_verified_block(
self,
_chain: &BeaconChain<T>,
) -> Result<GossipVerifiedBlockContents<T>, BlockContentsError> {
) -> Result<GossipVerifiedBlock<T>, BlockError> {
Ok(self)
}
fn inner_block(&self) -> &SignedBeaconBlock<T::EthSpec> {
self.0.block.as_block()
fn inner_block(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
self.block_cloned()
}
}
impl<T: BeaconChainTypes> IntoGossipVerifiedBlockContents<T> for PublishBlockRequest<T::EthSpec> {
impl<T: BeaconChainTypes> IntoGossipVerifiedBlock<T> for Arc<SignedBeaconBlock<T::EthSpec>> {
fn into_gossip_verified_block(
self,
chain: &BeaconChain<T>,
) -> Result<GossipVerifiedBlockContents<T>, BlockContentsError> {
let (block, blobs) = self.deconstruct();
let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch());
let (gossip_verified_blobs, gossip_verified_data_columns) = if peer_das_enabled {
let gossip_verified_data_columns =
build_gossip_verified_data_columns(chain, &block, blobs.map(|(_, blobs)| blobs))?;
(None, gossip_verified_data_columns)
} else {
let gossip_verified_blobs = build_gossip_verified_blobs(chain, &block, blobs)?;
(gossip_verified_blobs, None)
};
let gossip_verified_block = GossipVerifiedBlock::new(block, chain)?;
Ok((
gossip_verified_block,
gossip_verified_blobs,
gossip_verified_data_columns,
))
) -> Result<GossipVerifiedBlock<T>, BlockError> {
GossipVerifiedBlock::new(self, chain)
}
fn inner_block(&self) -> &SignedBeaconBlock<T::EthSpec> {
self.signed_block()
fn inner_block(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
self.clone()
}
}
#[allow(clippy::type_complexity)]
fn build_gossip_verified_blobs<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
block: &Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
blobs: Option<(KzgProofs<T::EthSpec>, BlobsList<T::EthSpec>)>,
) -> Result<Option<GossipVerifiedBlobList<T>>, BlockContentsError> {
blobs
.map(|(kzg_proofs, blobs)| {
let mut gossip_verified_blobs = vec![];
for (i, (kzg_proof, blob)) in kzg_proofs.iter().zip(blobs).enumerate() {
let _timer =
metrics::start_timer(&metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION);
let blob = BlobSidecar::new(i, blob, block, *kzg_proof)
.map_err(BlockContentsError::BlobSidecarError)?;
drop(_timer);
let gossip_verified_blob =
GossipVerifiedBlob::new(Arc::new(blob), i as u64, chain)?;
gossip_verified_blobs.push(gossip_verified_blob);
}
let gossip_verified_blobs = VariableList::from(gossip_verified_blobs);
Ok::<_, BlockContentsError>(gossip_verified_blobs)
})
.transpose()
}
fn build_gossip_verified_data_columns<T: BeaconChainTypes>(
pub fn build_blob_data_column_sidecars<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
block: &SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>,
blobs: Option<BlobsList<T::EthSpec>>,
) -> Result<Option<GossipVerifiedDataColumnList<T>>, BlockContentsError> {
blobs
blobs: BlobsList<T::EthSpec>,
) -> Result<DataColumnSidecarList<T::EthSpec>, DataColumnSidecarError> {
// Only attempt to build data columns if blobs is non empty to avoid skewing the metrics.
.filter(|b| !b.is_empty())
.map(|blobs| {
if blobs.is_empty() {
return Ok(vec![]);
}
let mut timer = metrics::start_timer_vec(
&metrics::DATA_COLUMN_SIDECAR_COMPUTATION,
&[&blobs.len().to_string()],
@@ -796,23 +753,7 @@ fn build_gossip_verified_data_columns<T: BeaconChainTypes>(
let sidecars = blobs_to_data_column_sidecars(&blobs, block, &chain.kzg, &chain.spec)
.discard_timer_on_break(&mut timer)?;
drop(timer);
let mut gossip_verified_data_columns = vec![];
for sidecar in sidecars {
let subnet = DataColumnSubnetId::from_column_index::<T::EthSpec>(
sidecar.index as usize,
&chain.spec,
);
let column = GossipVerifiedDataColumn::new(sidecar, subnet.into(), chain)?;
gossip_verified_data_columns.push(column);
}
let gossip_verified_data_columns = RuntimeVariableList::new(
gossip_verified_data_columns,
chain.spec.number_of_columns,
)
.map_err(DataColumnSidecarError::SszError)?;
Ok::<_, BlockContentsError>(gossip_verified_data_columns)
})
.transpose()
Ok(sidecars)
}
/// Implemented on types that can be converted into a `ExecutionPendingBlock`.
@@ -912,7 +853,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
// already know this block.
let fork_choice_read_lock = chain.canonical_head.fork_choice_read_lock();
if fork_choice_read_lock.contains_block(&block_root) {
return Err(BlockError::BlockIsAlreadyKnown(block_root));
return Err(BlockError::DuplicateFullyImported(block_root));
}
// Do not process a block that doesn't descend from the finalized root.
@@ -1046,7 +987,9 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
SeenBlock::Slashable => {
return Err(BlockError::Slashable);
}
SeenBlock::Duplicate => return Err(BlockError::BlockIsAlreadyKnown(block_root)),
SeenBlock::Duplicate => {
return Err(BlockError::DuplicateImportStatusUnknown(block_root))
}
SeenBlock::UniqueNonSlashable => {}
};
@@ -1894,7 +1837,7 @@ pub fn check_block_relevancy<T: BeaconChainTypes>(
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
return Err(BlockError::DuplicateFullyImported(block_root));
}
Ok(block_root)

View File

@@ -1,19 +1,14 @@
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlobList};
use crate::block_verification::BlockError;
use crate::data_availability_checker::AvailabilityCheckError;
pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock};
use crate::data_column_verification::{
CustodyDataColumn, CustodyDataColumnList, GossipDataColumnError, GossipVerifiedDataColumnList,
};
use crate::data_column_verification::{CustodyDataColumn, CustodyDataColumnList};
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::{get_block_root, GossipVerifiedBlock, PayloadVerificationOutcome};
use crate::{get_block_root, PayloadVerificationOutcome};
use derivative::Derivative;
use ssz_types::VariableList;
use state_processing::ConsensusContext;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use types::blob_sidecar::{self, BlobIdentifier, FixedBlobSidecarList};
use types::data_column_sidecar::{self};
use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList};
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec,
Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
@@ -390,67 +385,6 @@ impl<E: EthSpec> BlockImportData<E> {
}
}
pub type GossipVerifiedBlockContents<E> = (
GossipVerifiedBlock<E>,
Option<GossipVerifiedBlobList<E>>,
Option<GossipVerifiedDataColumnList<E>>,
);
#[derive(Debug)]
pub enum BlockContentsError {
BlockError(BlockError),
BlobError(GossipBlobError),
BlobSidecarError(blob_sidecar::BlobSidecarError),
DataColumnError(GossipDataColumnError),
DataColumnSidecarError(data_column_sidecar::DataColumnSidecarError),
}
impl From<BlockError> for BlockContentsError {
fn from(value: BlockError) -> Self {
Self::BlockError(value)
}
}
impl From<GossipBlobError> for BlockContentsError {
fn from(value: GossipBlobError) -> Self {
Self::BlobError(value)
}
}
impl From<GossipDataColumnError> for BlockContentsError {
fn from(value: GossipDataColumnError) -> Self {
Self::DataColumnError(value)
}
}
impl From<data_column_sidecar::DataColumnSidecarError> for BlockContentsError {
fn from(value: data_column_sidecar::DataColumnSidecarError) -> Self {
Self::DataColumnSidecarError(value)
}
}
impl std::fmt::Display for BlockContentsError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BlockContentsError::BlockError(err) => {
write!(f, "BlockError({})", err)
}
BlockContentsError::BlobError(err) => {
write!(f, "BlobError({})", err)
}
BlockContentsError::BlobSidecarError(err) => {
write!(f, "BlobSidecarError({:?})", err)
}
BlockContentsError::DataColumnError(err) => {
write!(f, "DataColumnError({:?})", err)
}
BlockContentsError::DataColumnSidecarError(err) => {
write!(f, "DataColumnSidecarError({:?})", err)
}
}
}
}
/// Trait for common block operations.
pub trait AsBlock<E: EthSpec> {
fn slot(&self) -> Slot;

View File

@@ -75,9 +75,9 @@ pub use self::historical_blocks::HistoricalBlockError;
pub use attestation_verification::Error as AttestationError;
pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError};
pub use block_verification::{
get_block_root, BlockError, ExecutionPayloadError, ExecutionPendingBlock, GossipVerifiedBlock,
IntoExecutionPendingBlock, IntoGossipVerifiedBlockContents, PayloadVerificationOutcome,
PayloadVerificationStatus,
build_blob_data_column_sidecars, get_block_root, BlockError, ExecutionPayloadError,
ExecutionPendingBlock, GossipVerifiedBlock, IntoExecutionPendingBlock, IntoGossipVerifiedBlock,
PayloadVerificationOutcome, PayloadVerificationStatus,
};
pub use block_verification_types::AvailabilityPendingExecutedBlock;
pub use block_verification_types::ExecutedBlock;

View File

@@ -878,7 +878,7 @@ where
let proposer_index = state.get_beacon_proposer_index(slot, &self.spec).unwrap();
// If we produce two blocks for the same slot, they hash up to the same value and
// BeaconChain errors out with `BlockIsAlreadyKnown`. Vary the graffiti so that we produce
// BeaconChain errors out with `DuplicateFullyImported`. Vary the graffiti so that we produce
// different blocks each time.
let graffiti = Graffiti::from(self.rng.lock().gen::<[u8; 32]>());
@@ -940,7 +940,7 @@ where
let proposer_index = state.get_beacon_proposer_index(slot, &self.spec).unwrap();
// If we produce two blocks for the same slot, they hash up to the same value and
// BeaconChain errors out with `BlockIsAlreadyKnown`. Vary the graffiti so that we produce
// BeaconChain errors out with `DuplicateFullyImported`. Vary the graffiti so that we produce
// different blocks each time.
let graffiti = Graffiti::from(self.rng.lock().gen::<[u8; 32]>());

View File

@@ -976,7 +976,7 @@ async fn block_gossip_verification() {
harness
.chain
.process_gossip_blob(gossip_verified)
.process_gossip_blob(gossip_verified, || Ok(()))
.await
.expect("should import valid gossip verified blob");
}
@@ -1173,7 +1173,7 @@ async fn block_gossip_verification() {
assert!(
matches!(
unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone())).await),
BlockError::BlockIsAlreadyKnown(_),
BlockError::DuplicateImportStatusUnknown(_),
),
"should register any valid signature against the proposer, even if the block failed later verification"
);
@@ -1201,7 +1201,7 @@ async fn block_gossip_verification() {
.verify_block_for_gossip(block.clone())
.await
.expect_err("should error when processing known block"),
BlockError::BlockIsAlreadyKnown(_)
BlockError::DuplicateImportStatusUnknown(_)
),
"the second proposal by this validator should be rejected"
);
@@ -1247,7 +1247,7 @@ async fn verify_block_for_gossip_slashing_detection() {
.unwrap();
harness
.chain
.process_gossip_blob(verified_blob)
.process_gossip_blob(verified_blob, || Ok(()))
.await
.unwrap();
}

View File

@@ -35,7 +35,7 @@ async fn blob_sidecar_event_on_process_gossip_blob() {
let _ = harness
.chain
.process_gossip_blob(gossip_verified_blob)
.process_gossip_blob(gossip_verified_blob, || Ok(()))
.await
.unwrap();

View File

@@ -1298,7 +1298,7 @@ pub fn serve<T: BeaconChainTypes>(
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block_contents),
ProvenancedBlock::local_from_publish_request(block_contents),
chain,
&network_tx,
log,
@@ -1340,7 +1340,7 @@ pub fn serve<T: BeaconChainTypes>(
})?;
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block_contents),
ProvenancedBlock::local_from_publish_request(block_contents),
chain,
&network_tx,
log,
@@ -1375,7 +1375,7 @@ pub fn serve<T: BeaconChainTypes>(
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block_contents),
ProvenancedBlock::local_from_publish_request(block_contents),
chain,
&network_tx,
log,
@@ -1419,7 +1419,7 @@ pub fn serve<T: BeaconChainTypes>(
})?;
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block_contents),
ProvenancedBlock::local_from_publish_request(block_contents),
chain,
&network_tx,
log,

View File

@@ -1,13 +1,17 @@
use crate::metrics;
use beacon_chain::block_verification_types::{AsBlock, BlockContentsError};
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now};
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError,
IntoGossipVerifiedBlockContents, NotifyExecutionLayer,
build_blob_data_column_sidecars, AvailabilityProcessingStatus, BeaconChain, BeaconChainError,
BeaconChainTypes, BlockError, IntoGossipVerifiedBlock, NotifyExecutionLayer,
};
use eth2::types::{
BlobsBundle, BroadcastValidation, ErrorMessage, ExecutionPayloadAndBlobs, FullPayloadContents,
PublishBlockRequest, SignedBlockContents,
};
use eth2::types::{into_full_block_and_blobs, BroadcastValidation, ErrorMessage};
use eth2::types::{FullPayloadContents, PublishBlockRequest};
use execution_layer::ProvenancedPayload;
use lighthouse_network::{NetworkGlobals, PubsubMessage};
use network::NetworkMessage;
@@ -15,39 +19,62 @@ use rand::seq::SliceRandom;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tree_hash::TreeHash;
use types::{
AbstractExecPayload, BeaconBlockRef, BlobSidecarList, BlockImportSource, DataColumnSidecarList,
DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, FullPayload,
FullPayloadBellatrix, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, VariableList,
AbstractExecPayload, BeaconBlockRef, BlobSidecar, BlobsList, BlockImportSource,
DataColumnSidecarList, DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName,
FullPayload, FullPayloadBellatrix, Hash256, KzgProofs, SignedBeaconBlock,
SignedBlindedBeaconBlock,
};
use warp::http::StatusCode;
use warp::{reply::Response, Rejection, Reply};
pub enum ProvenancedBlock<T: BeaconChainTypes, B: IntoGossipVerifiedBlockContents<T>> {
pub type UnverifiedBlobs<T> = Option<(
KzgProofs<<T as BeaconChainTypes>::EthSpec>,
BlobsList<<T as BeaconChainTypes>::EthSpec>,
)>;
pub enum ProvenancedBlock<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>> {
/// The payload was built using a local EE.
Local(B, PhantomData<T>),
Local(B, UnverifiedBlobs<T>, PhantomData<T>),
/// The payload was build using a remote builder (e.g., via a mev-boost
/// compatible relay).
Builder(B, PhantomData<T>),
Builder(B, UnverifiedBlobs<T>, PhantomData<T>),
}
impl<T: BeaconChainTypes, B: IntoGossipVerifiedBlockContents<T>> ProvenancedBlock<T, B> {
pub fn local(block: B) -> Self {
Self::Local(block, PhantomData)
impl<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>> ProvenancedBlock<T, B> {
pub fn local(block: B, blobs: UnverifiedBlobs<T>) -> Self {
Self::Local(block, blobs, PhantomData)
}
pub fn builder(block: B) -> Self {
Self::Builder(block, PhantomData)
pub fn builder(block: B, blobs: UnverifiedBlobs<T>) -> Self {
Self::Builder(block, blobs, PhantomData)
}
}
impl<T: BeaconChainTypes> ProvenancedBlock<T, Arc<SignedBeaconBlock<T::EthSpec>>> {
pub fn local_from_publish_request(request: PublishBlockRequest<T::EthSpec>) -> Self {
match request {
PublishBlockRequest::Block(block) => Self::local(block, None),
PublishBlockRequest::BlockContents(block_contents) => {
let SignedBlockContents {
signed_block,
kzg_proofs,
blobs,
} = block_contents;
Self::local(signed_block, Some((kzg_proofs, blobs)))
}
}
}
}
/// Handles a request from the HTTP API for full blocks.
#[allow(clippy::too_many_arguments)]
pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockContents<T>>(
pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
block_root: Option<Hash256>,
provenanced_block: ProvenancedBlock<T, B>,
chain: Arc<BeaconChain<T>>,
@@ -59,28 +86,29 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
) -> Result<Response, Rejection> {
let seen_timestamp = timestamp_now();
let (block_contents, is_locally_built_block) = match provenanced_block {
ProvenancedBlock::Local(block_contents, _) => (block_contents, true),
ProvenancedBlock::Builder(block_contents, _) => (block_contents, false),
let (unverified_block, unverified_blobs, is_locally_built_block) = match provenanced_block {
ProvenancedBlock::Local(block, blobs, _) => (block, blobs, true),
ProvenancedBlock::Builder(block, blobs, _) => (block, blobs, false),
};
let provenance = if is_locally_built_block {
"local"
} else {
"builder"
};
let block = block_contents.inner_block().clone();
let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock);
let block = unverified_block.inner_block();
debug!(log, "Signed block received in HTTP API"; "slot" => block.slot());
let malicious_withhold_count = chain.config.malicious_withhold_count;
let chain_cloned = chain.clone();
/* actually publish a block */
let publish_block = move |block: Arc<SignedBeaconBlock<T::EthSpec>>,
blobs_opt: Option<BlobSidecarList<T::EthSpec>>,
data_cols_opt: Option<DataColumnSidecarList<T::EthSpec>>,
let publish_block_p2p = move |block: Arc<SignedBeaconBlock<T::EthSpec>>,
should_publish_block: bool,
blob_sidecars: Vec<Arc<BlobSidecar<T::EthSpec>>>,
mut data_column_sidecars: DataColumnSidecarList<T::EthSpec>,
sender,
log,
seen_timestamp| {
seen_timestamp|
-> Result<(), BlockError> {
let publish_timestamp = timestamp_now();
let publish_delay = publish_timestamp
.checked_sub(seen_timestamp)
@@ -92,47 +120,41 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
publish_delay,
);
let mut pubsub_messages = if should_publish_block {
info!(
log,
"Signed block published to network via HTTP API";
"slot" => block.slot(),
"publish_delay_ms" => publish_delay.as_millis()
"blobs_published" => blob_sidecars.len(),
"publish_delay_ms" => publish_delay.as_millis(),
);
vec![PubsubMessage::BeaconBlock(block.clone())]
} else {
vec![]
};
match block.as_ref() {
SignedBeaconBlock::Base(_)
| SignedBeaconBlock::Altair(_)
| SignedBeaconBlock::Bellatrix(_)
| SignedBeaconBlock::Capella(_) => {
crate::publish_pubsub_message(&sender, PubsubMessage::BeaconBlock(block))
crate::publish_pubsub_messages(&sender, pubsub_messages)
.map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?;
}
SignedBeaconBlock::Deneb(_) | SignedBeaconBlock::Electra(_) => {
let mut pubsub_messages = vec![PubsubMessage::BeaconBlock(block)];
if let Some(blob_sidecars) = blobs_opt {
// Publish blob sidecars
for (blob_index, blob) in blob_sidecars.into_iter().enumerate() {
pubsub_messages.push(PubsubMessage::BlobSidecar(Box::new((
blob_index as u64,
blob,
))));
for blob in blob_sidecars.into_iter() {
pubsub_messages.push(PubsubMessage::BlobSidecar(Box::new((blob.index, blob))));
}
}
if let Some(data_col_sidecars) = data_cols_opt {
let mut data_col_sidecars = data_col_sidecars.to_vec();
if malicious_withhold_count > 0 {
let columns_to_keep = data_col_sidecars
let columns_to_keep = data_column_sidecars
.len()
.saturating_sub(malicious_withhold_count);
// Randomize columns before dropping the last malicious_withhold_count items
data_col_sidecars.shuffle(&mut rand::thread_rng());
data_col_sidecars = data_col_sidecars
.into_iter()
.take(columns_to_keep)
.collect::<Vec<_>>();
data_column_sidecars.shuffle(&mut rand::thread_rng());
drop(data_column_sidecars.drain(columns_to_keep..));
}
for data_col in data_col_sidecars {
for data_col in data_column_sidecars {
let subnet = DataColumnSubnetId::from_column_index::<T::EthSpec>(
data_col.index as usize,
&chain_cloned.spec,
@@ -141,7 +163,6 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
subnet, data_col,
))));
}
}
crate::publish_pubsub_messages(&sender, pubsub_messages)
.map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?;
}
@@ -150,72 +171,162 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
};
/* only publish if gossip- and consensus-valid and equivocation-free */
let chain_clone = chain.clone();
let slot = block.message().slot();
let proposer_index = block.message().proposer_index();
let sender_clone = network_tx.clone();
let log_clone = log.clone();
/* if we can form a `GossipVerifiedBlock`, we've passed our basic gossip checks */
let (gossip_verified_block, gossip_verified_blobs, gossip_verified_data_columns) =
match block_contents.into_gossip_verified_block(&chain) {
Ok(b) => b,
Err(BlockContentsError::BlockError(BlockError::BlockIsAlreadyKnown(_)))
| Err(BlockContentsError::BlobError(
beacon_chain::blob_verification::GossipBlobError::RepeatBlob { .. },
)) => {
// Allow the status code for duplicate blocks to be overridden based on config.
return Ok(warp::reply::with_status(
warp::reply::json(&ErrorMessage {
code: duplicate_status_code.as_u16(),
message: "duplicate block".to_string(),
stacktraces: vec![],
}),
duplicate_status_code,
)
.into_response());
}
Err(e) => {
warn!(
log,
"Not publishing block - not gossip verified";
"slot" => slot,
"error" => %e
// Convert blobs to either:
//
// 1. Blob sidecars if prior to peer DAS, or
// 2. Data column sidecars if post peer DAS.
let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch());
let (blob_sidecars, data_column_sidecars) = match unverified_blobs {
// Pre-PeerDAS: construct blob sidecars for the network.
Some((kzg_proofs, blobs)) if !peer_das_enabled => {
let blob_sidecars = kzg_proofs
.into_iter()
.zip(blobs)
.enumerate()
.map(|(i, (proof, unverified_blob))| {
let _timer = metrics::start_timer(
&beacon_chain::metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION,
);
return Err(warp_utils::reject::custom_bad_request(e.to_string()));
let blob_sidecar =
BlobSidecar::new(i, unverified_blob, &block, proof).map(Arc::new);
blob_sidecar.map_err(|e| {
error!(
log,
"Invalid blob - not publishing block";
"error" => ?e,
"blob_index" => i,
"slot" => slot,
);
warp_utils::reject::custom_bad_request(format!("{e:?}"))
})
})
.collect::<Result<Vec<_>, Rejection>>()?;
(blob_sidecars, vec![])
}
// Post PeerDAS: construct data columns.
Some((_, blobs)) => {
// TODO(das): this is sub-optimal and should likely not be happening prior to gossip
// block publishing.
let data_column_sidecars = build_blob_data_column_sidecars(&chain, &block, blobs)
.map_err(|e| {
error!(
log,
"Invalid data column - not publishing block";
"error" => ?e,
"slot" => slot
);
warp_utils::reject::custom_bad_request(format!("{e:?}"))
})?;
(vec![], data_column_sidecars)
}
None => (vec![], vec![]),
};
// TODO(das): We could potentially get rid of these conversions and pass `GossipVerified` types
// to `publish_block`, i.e. have `GossipVerified` types in `PubsubMessage`?
// This saves us from extra code and provides guarantee that published
// components are verified.
// Clone here, so we can take advantage of the `Arc`. The block in `BlockContents` is not,
// `Arc`'d but blobs are.
let block = gossip_verified_block.block.block_cloned();
let blobs_opt = gossip_verified_blobs.as_ref().map(|gossip_verified_blobs| {
let blobs = gossip_verified_blobs
// Gossip verify the block and blobs/data columns separately.
let gossip_verified_block_result = unverified_block.into_gossip_verified_block(&chain);
let gossip_verified_blobs = blob_sidecars
.into_iter()
.map(|blob_sidecar| {
let gossip_verified_blob =
GossipVerifiedBlob::new(blob_sidecar.clone(), blob_sidecar.index, &chain);
match gossip_verified_blob {
Ok(blob) => Ok(Some(blob)),
Err(GossipBlobError::RepeatBlob { proposer, .. }) => {
// Log the error but do not abort publication, we may need to publish the block
// or some of the other blobs if the block & blobs are only partially published
// by the other publisher.
debug!(
log,
"Blob for publication already known";
"blob_index" => blob_sidecar.index,
"slot" => slot,
"proposer" => proposer,
);
Ok(None)
}
Err(e) => {
error!(
log,
"Blob for publication is gossip-invalid";
"blob_index" => blob_sidecar.index,
"slot" => slot,
"error" => ?e,
);
Err(warp_utils::reject::custom_bad_request(e.to_string()))
}
}
})
.collect::<Result<Vec<_>, Rejection>>()?;
let gossip_verified_data_columns = data_column_sidecars
.into_iter()
.map(|data_column_sidecar| {
let column_index = data_column_sidecar.index as usize;
let subnet =
DataColumnSubnetId::from_column_index::<T::EthSpec>(column_index, &chain.spec);
let gossip_verified_column =
GossipVerifiedDataColumn::new(data_column_sidecar, subnet.into(), &chain);
match gossip_verified_column {
Ok(blob) => Ok(Some(blob)),
Err(GossipDataColumnError::PriorKnown { proposer, .. }) => {
// Log the error but do not abort publication, we may need to publish the block
// or some of the other data columns if the block & data columns are only
// partially published by the other publisher.
debug!(
log,
"Data column for publication already known";
"column_index" => column_index,
"slot" => slot,
"proposer" => proposer,
);
Ok(None)
}
Err(e) => {
error!(
log,
"Data column for publication is gossip-invalid";
"column_index" => column_index,
"slot" => slot,
"error" => ?e,
);
Err(warp_utils::reject::custom_bad_request(format!("{e:?}")))
}
}
})
.collect::<Result<Vec<_>, Rejection>>()?;
let publishable_blobs = gossip_verified_blobs
.iter()
.flatten()
.map(|b| b.clone_blob())
.collect::<Vec<_>>();
VariableList::from(blobs)
});
let data_cols_opt = gossip_verified_data_columns
.as_ref()
.map(|gossip_verified_data_columns| {
gossip_verified_data_columns
.into_iter()
.map(|col| col.clone_data_column())
.collect::<Vec<_>>()
let publishable_data_columns = gossip_verified_data_columns
.iter()
.flatten()
.map(|b| b.clone_data_column())
.collect::<Vec<_>>();
let block_root = block_root.unwrap_or_else(|| {
gossip_verified_block_result.as_ref().map_or_else(
|_| block.canonical_root(),
|verified_block| verified_block.block_root,
)
});
let block_root = block_root.unwrap_or(gossip_verified_block.block_root);
let should_publish_block = gossip_verified_block_result.is_ok();
if let BroadcastValidation::Gossip = validation_level {
publish_block(
publish_block_p2p(
block.clone(),
blobs_opt.clone(),
data_cols_opt.clone(),
should_publish_block,
publishable_blobs.clone(),
publishable_data_columns.clone(),
sender_clone.clone(),
log.clone(),
seen_timestamp,
@@ -223,40 +334,41 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
.map_err(|_| warp_utils::reject::custom_server_error("unable to publish".into()))?;
}
let block_clone = block.clone();
let publish_fn = move || match validation_level {
BroadcastValidation::Gossip => Ok(()),
BroadcastValidation::Consensus => publish_block(
block_clone,
blobs_opt,
data_cols_opt,
sender_clone,
log_clone,
let publish_fn_completed = Arc::new(AtomicBool::new(false));
let block_to_publish = block.clone();
let publish_fn = || {
match validation_level {
BroadcastValidation::Gossip => (),
BroadcastValidation::Consensus => publish_block_p2p(
block_to_publish.clone(),
should_publish_block,
publishable_blobs.clone(),
publishable_data_columns.clone(),
sender_clone.clone(),
log.clone(),
seen_timestamp,
),
)?,
BroadcastValidation::ConsensusAndEquivocation => {
check_slashable(
&chain_clone,
&blobs_opt,
block_root,
&block_clone,
&log_clone,
)?;
publish_block(
block_clone,
blobs_opt,
data_cols_opt,
sender_clone,
log_clone,
check_slashable(&chain, block_root, &block_to_publish, &log)?;
publish_block_p2p(
block_to_publish.clone(),
should_publish_block,
publishable_blobs.clone(),
publishable_data_columns.clone(),
sender_clone.clone(),
log.clone(),
seen_timestamp,
)
)?;
}
};
publish_fn_completed.store(true, Ordering::SeqCst);
Ok(())
};
if let Some(gossip_verified_blobs) = gossip_verified_blobs {
for blob in gossip_verified_blobs {
if let Err(e) = Box::pin(chain.process_gossip_blob(blob)).await {
for blob in gossip_verified_blobs.into_iter().flatten() {
// Importing the blobs could trigger block import and network publication in the case
// where the block was already seen on gossip.
if let Err(e) = Box::pin(chain.process_gossip_blob(blob, &publish_fn)).await {
let msg = format!("Invalid blob: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
@@ -270,24 +382,33 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
};
}
}
}
if let Some(gossip_verified_data_columns) = gossip_verified_data_columns {
if gossip_verified_data_columns
.iter()
.map(Option::is_some)
.count()
> 0
{
let custody_columns_indices = &network_globals.custody_columns;
let custody_columns = gossip_verified_data_columns
.into_iter()
.flatten()
.filter(|data_column| custody_columns_indices.contains(&data_column.index()))
.collect();
if let Err(e) = Box::pin(chain.process_gossip_data_columns(custody_columns)).await {
// Importing the columns could trigger block import and network publication in the case
// where the block was already seen on gossip.
if let Err(e) =
Box::pin(chain.process_gossip_data_columns(custody_columns, publish_fn)).await
{
let msg = format!("Invalid data column: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
} else {
error!(
log,
"Invalid blob provided to HTTP API";
"Invalid data column during block publication";
"reason" => &msg
);
Err(warp_utils::reject::custom_bad_request(msg))
@@ -295,23 +416,117 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}
}
match Box::pin(chain.process_block(
match gossip_verified_block_result {
Ok(gossip_verified_block) => {
let import_result = Box::pin(chain.process_block(
block_root,
gossip_verified_block,
NotifyExecutionLayer::Yes,
BlockImportSource::HttpApi,
publish_fn,
))
.await;
post_block_import_logging_and_response(
import_result,
validation_level,
block,
is_locally_built_block,
seen_timestamp,
&chain,
&log,
)
.await
{
Ok(AvailabilityProcessingStatus::Imported(root)) => {
}
Err(BlockError::DuplicateFullyImported(root)) => {
if publish_fn_completed.load(Ordering::SeqCst) {
post_block_import_logging_and_response(
Ok(AvailabilityProcessingStatus::Imported(root)),
validation_level,
block,
is_locally_built_block,
seen_timestamp,
&chain,
&log,
)
.await
} else {
// None of the components provided in this HTTP request were new, so this was an
// entirely redundant duplicate request. Return a status code indicating this,
// which can be overridden based on config.
Ok(warp::reply::with_status(
warp::reply::json(&ErrorMessage {
code: duplicate_status_code.as_u16(),
message: "duplicate block".to_string(),
stacktraces: vec![],
}),
duplicate_status_code,
)
.into_response())
}
}
Err(BlockError::DuplicateImportStatusUnknown(root)) => {
debug!(
log,
"Block previously seen";
"block_root" => ?root,
"slot" => block.slot(),
);
let import_result = Box::pin(chain.process_block(
block_root,
block.clone(),
NotifyExecutionLayer::Yes,
BlockImportSource::HttpApi,
publish_fn,
))
.await;
post_block_import_logging_and_response(
import_result,
validation_level,
block,
is_locally_built_block,
seen_timestamp,
&chain,
&log,
)
.await
}
Err(e) => {
warn!(
log,
"Not publishing block - not gossip verified";
"slot" => slot,
"error" => %e
);
Err(warp_utils::reject::custom_bad_request(e.to_string()))
}
}
}
async fn post_block_import_logging_and_response<T: BeaconChainTypes>(
result: Result<AvailabilityProcessingStatus, BlockError>,
validation_level: BroadcastValidation,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
is_locally_built_block: bool,
seen_timestamp: Duration,
chain: &Arc<BeaconChain<T>>,
log: &Logger,
) -> Result<Response, Rejection> {
match result {
// The `DuplicateFullyImported` case here captures the case where the block finishes
// being imported after gossip verification. It could be that it finished imported as a
// result of the block being imported from gossip, OR it could be that it finished importing
// after processing of a gossip blob. In the latter case we MUST run fork choice to
// re-compute the head.
Ok(AvailabilityProcessingStatus::Imported(root))
| Err(BlockError::DuplicateFullyImported(root)) => {
let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock);
info!(
log,
"Valid block from HTTP API";
"block_delay" => ?delay,
"root" => format!("{}", root),
"proposer_index" => proposer_index,
"slot" =>slot,
"root" => %root,
"proposer_index" => block.message().proposer_index(),
"slot" => block.slot(),
);
// Notify the validator monitor.
@@ -330,7 +545,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
// blocks built with builders we consider the broadcast time to be
// when the blinded block is published to the builder.
if is_locally_built_block {
late_block_logging(&chain, seen_timestamp, block.message(), root, "local", &log)
late_block_logging(chain, seen_timestamp, block.message(), root, "local", log)
}
Ok(warp::reply().into_response())
}
@@ -359,11 +574,10 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(format!("{e}")))
} else {
let msg = format!("{:?}", e);
error!(
log,
"Invalid block provided to HTTP API";
"reason" => &msg
"reason" => ?e,
);
Err(warp_utils::reject::custom_bad_request(format!(
"Invalid block: {e}"
@@ -385,7 +599,7 @@ pub async fn publish_blinded_block<T: BeaconChainTypes>(
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
) -> Result<Response, Rejection> {
let block_root = blinded_block.canonical_root();
let full_block: ProvenancedBlock<T, PublishBlockRequest<T::EthSpec>> =
let full_block =
reconstruct_block(chain.clone(), block_root, blinded_block, log.clone()).await?;
publish_block::<T, _>(
Some(block_root),
@@ -408,7 +622,7 @@ pub async fn reconstruct_block<T: BeaconChainTypes>(
block_root: Hash256,
block: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
log: Logger,
) -> Result<ProvenancedBlock<T, PublishBlockRequest<T::EthSpec>>, Rejection> {
) -> Result<ProvenancedBlock<T, Arc<SignedBeaconBlock<T::EthSpec>>>, Rejection> {
let full_payload_opt = if let Ok(payload_header) = block.message().body().execution_payload() {
let el = chain.execution_layer.as_ref().ok_or_else(|| {
warp_utils::reject::custom_server_error("Missing execution layer".to_string())
@@ -474,14 +688,17 @@ pub async fn reconstruct_block<T: BeaconChainTypes>(
match full_payload_opt {
// A block without a payload is pre-merge and we consider it locally
// built.
None => into_full_block_and_blobs(block, None).map(ProvenancedBlock::local),
None => block
.try_into_full_block(None)
.ok_or("Failed to build full block with payload".to_string())
.map(|full_block| ProvenancedBlock::local(Arc::new(full_block), None)),
Some(ProvenancedPayload::Local(full_payload_contents)) => {
into_full_block_and_blobs(block, Some(full_payload_contents))
.map(ProvenancedBlock::local)
into_full_block_and_blobs::<T>(block, full_payload_contents)
.map(|(block, blobs)| ProvenancedBlock::local(block, blobs))
}
Some(ProvenancedPayload::Builder(full_payload_contents)) => {
into_full_block_and_blobs(block, Some(full_payload_contents))
.map(ProvenancedBlock::builder)
into_full_block_and_blobs::<T>(block, full_payload_contents)
.map(|(block, blobs)| ProvenancedBlock::builder(block, blobs))
}
}
.map_err(|e| {
@@ -540,28 +757,11 @@ fn late_block_logging<T: BeaconChainTypes, P: AbstractExecPayload<T::EthSpec>>(
/// Check if any of the blobs or the block are slashable. Returns `BlockError::Slashable` if so.
fn check_slashable<T: BeaconChainTypes>(
chain_clone: &BeaconChain<T>,
blobs_opt: &Option<BlobSidecarList<T::EthSpec>>,
block_root: Hash256,
block_clone: &SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>,
log_clone: &Logger,
) -> Result<(), BlockError> {
let slashable_cache = chain_clone.observed_slashable.read();
if let Some(blobs) = blobs_opt.as_ref() {
blobs.iter().try_for_each(|blob| {
if slashable_cache
.is_slashable(blob.slot(), blob.block_proposer_index(), blob.block_root())
.map_err(|e| BlockError::BeaconChainError(e.into()))?
{
warn!(
log_clone,
"Not publishing equivocating blob";
"slot" => block_clone.slot()
);
return Err(BlockError::Slashable);
}
Ok(())
})?;
};
if slashable_cache
.is_slashable(
block_clone.slot(),
@@ -579,3 +779,38 @@ fn check_slashable<T: BeaconChainTypes>(
}
Ok(())
}
/// Converting from a `SignedBlindedBeaconBlock` into a full `SignedBlockContents`.
#[allow(clippy::type_complexity)]
pub fn into_full_block_and_blobs<T: BeaconChainTypes>(
blinded_block: SignedBlindedBeaconBlock<T::EthSpec>,
maybe_full_payload_contents: FullPayloadContents<T::EthSpec>,
) -> Result<(Arc<SignedBeaconBlock<T::EthSpec>>, UnverifiedBlobs<T>), String> {
match maybe_full_payload_contents {
// This variant implies a pre-deneb block
FullPayloadContents::Payload(execution_payload) => {
let signed_block = blinded_block
.try_into_full_block(Some(execution_payload))
.ok_or("Failed to build full block with payload".to_string())?;
Ok((Arc::new(signed_block), None))
}
// This variant implies a post-deneb block
FullPayloadContents::PayloadAndBlobs(payload_and_blobs) => {
let ExecutionPayloadAndBlobs {
execution_payload,
blobs_bundle,
} = payload_and_blobs;
let signed_block = blinded_block
.try_into_full_block(Some(execution_payload))
.ok_or("Failed to build full block with payload".to_string())?;
let BlobsBundle {
commitments: _,
proofs,
blobs,
} = blobs_bundle;
Ok((Arc::new(signed_block), Some((proofs, blobs))))
}
}
}

View File

@@ -61,7 +61,8 @@ type Mutator<E> = BoxedMutator<E, MemoryStore<E>, MemoryStore<E>>;
impl<E: EthSpec> InteractiveTester<E> {
pub async fn new(spec: Option<ChainSpec>, validator_count: usize) -> Self {
Self::new_with_initializer_and_mutator(spec, validator_count, None, None).await
Self::new_with_initializer_and_mutator(spec, validator_count, None, None, Config::default())
.await
}
pub async fn new_with_initializer_and_mutator(
@@ -69,6 +70,7 @@ impl<E: EthSpec> InteractiveTester<E> {
validator_count: usize,
initializer: Option<Initializer<E>>,
mutator: Option<Mutator<E>>,
config: Config,
) -> Self {
let mut harness_builder = BeaconChainHarness::builder(E::default())
.spec_or_default(spec.map(Arc::new))
@@ -99,8 +101,9 @@ impl<E: EthSpec> InteractiveTester<E> {
listening_socket,
network_rx,
..
} = create_api_server(
} = create_api_server_with_config(
harness.chain.clone(),
config,
&harness.runtime,
harness.logger().clone(),
)
@@ -131,6 +134,15 @@ pub async fn create_api_server<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
test_runtime: &TestRuntime,
log: Logger,
) -> ApiServer<T, impl Future<Output = ()>> {
create_api_server_with_config(chain, Config::default(), test_runtime, log).await
}
pub async fn create_api_server_with_config<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
http_config: Config,
test_runtime: &TestRuntime,
log: Logger,
) -> ApiServer<T, impl Future<Output = ()>> {
// Use port 0 to allocate a new unused port.
let port = 0;
@@ -220,12 +232,14 @@ pub async fn create_api_server<T: BeaconChainTypes>(
.unwrap();
let ctx = Arc::new(Context {
// Override several config fields with defaults. If these need to be tweaked in future
// we could remove these overrides.
config: Config {
enabled: true,
listen_port: port,
data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR),
enable_light_client_server: true,
..Config::default()
..http_config
},
chain: Some(chain),
network_senders: Some(network_senders),

View File

@@ -1,13 +1,16 @@
use beacon_chain::blob_verification::GossipVerifiedBlob;
use beacon_chain::{
test_utils::{AttestationStrategy, BlockStrategy},
GossipVerifiedBlock, IntoGossipVerifiedBlockContents,
GossipVerifiedBlock, IntoGossipVerifiedBlock,
};
use eth2::reqwest::StatusCode;
use eth2::types::{BroadcastValidation, PublishBlockRequest};
use http_api::test_utils::InteractiveTester;
use http_api::{publish_blinded_block, publish_block, reconstruct_block, ProvenancedBlock};
use http_api::{publish_blinded_block, publish_block, reconstruct_block, Config, ProvenancedBlock};
use std::sync::Arc;
use types::{Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MainnetEthSpec, Slot};
use types::{
BlobSidecar, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MainnetEthSpec, Slot,
};
use warp::Rejection;
use warp_utils::reject::CustomBadRequest;
@@ -81,7 +84,7 @@ pub async fn gossip_invalid() {
/* mandated by Beacon API spec */
assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST));
assert_server_message_error(error_response, "BAD_REQUEST: BlockError(NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 })".to_string());
assert_server_message_error(error_response, "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string());
}
/// This test checks that a block that is valid from a gossip perspective is accepted when using `broadcast_validation=gossip`.
@@ -266,7 +269,7 @@ pub async fn consensus_invalid() {
/* mandated by Beacon API spec */
assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST));
assert_server_message_error(error_response, "BAD_REQUEST: BlockError(NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 })".to_string());
assert_server_message_error(error_response, "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string());
}
/// This test checks that a block that is only valid from a gossip perspective is rejected when using `broadcast_validation=consensus`.
@@ -360,10 +363,9 @@ pub async fn consensus_partial_pass_only_consensus() {
);
assert_ne!(block_a.state_root(), block_b.state_root());
let gossip_block_contents_b = PublishBlockRequest::new(block_b, blobs_b)
.into_gossip_verified_block(&tester.harness.chain);
assert!(gossip_block_contents_b.is_ok());
let gossip_block_a = GossipVerifiedBlock::new(block_a.clone().into(), &tester.harness.chain);
let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain);
assert!(gossip_block_b.is_ok());
let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain);
assert!(gossip_block_a.is_err());
/* submit `block_b` which should induce equivocation */
@@ -372,7 +374,7 @@ pub async fn consensus_partial_pass_only_consensus() {
let publication_result = publish_block(
None,
ProvenancedBlock::local(gossip_block_contents_b.unwrap()),
ProvenancedBlock::local(gossip_block_b.unwrap(), blobs_b),
tester.harness.chain.clone(),
&channel.0,
test_logger,
@@ -382,7 +384,7 @@ pub async fn consensus_partial_pass_only_consensus() {
)
.await;
assert!(publication_result.is_ok());
assert!(publication_result.is_ok(), "{publication_result:?}");
assert!(tester
.harness
.chain
@@ -481,7 +483,7 @@ pub async fn equivocation_invalid() {
/* mandated by Beacon API spec */
assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST));
assert_server_message_error(error_response, "BAD_REQUEST: BlockError(NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 })".to_string());
assert_server_message_error(error_response, "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string());
}
/// This test checks that a block that is valid from both a gossip and consensus perspective is rejected when using `broadcast_validation=consensus_and_equivocation`.
@@ -555,10 +557,7 @@ pub async fn equivocation_consensus_early_equivocation() {
let error_response: eth2::Error = response.err().unwrap();
assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST));
assert_server_message_error(
error_response,
"BAD_REQUEST: BlockError(Slashable)".to_string(),
);
assert_server_message_error(error_response, "BAD_REQUEST: Slashable".to_string());
}
/// This test checks that a block that is only valid from a gossip perspective is rejected when using `broadcast_validation=consensus_and_equivocation`.
@@ -642,7 +641,7 @@ pub async fn equivocation_consensus_late_equivocation() {
let slot_b = slot_a + 1;
let state_a = tester.harness.get_current_state();
let ((block_a, blobs_a), mut state_after_a) =
let ((block_a, _blobs_a), mut state_after_a) =
tester.harness.make_block(state_a.clone(), slot_b).await;
let ((block_b, blobs_b), mut state_after_b) = tester.harness.make_block(state_a, slot_b).await;
@@ -657,19 +656,18 @@ pub async fn equivocation_consensus_late_equivocation() {
);
assert_ne!(block_a.state_root(), block_b.state_root());
let gossip_block_contents_b = PublishBlockRequest::new(block_b, blobs_b)
.into_gossip_verified_block(&tester.harness.chain);
assert!(gossip_block_contents_b.is_ok());
let gossip_block_contents_a = PublishBlockRequest::new(block_a, blobs_a)
.into_gossip_verified_block(&tester.harness.chain);
assert!(gossip_block_contents_a.is_err());
let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain);
assert!(gossip_block_b.is_ok());
let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain);
assert!(gossip_block_a.is_err());
let channel = tokio::sync::mpsc::unbounded_channel();
let network_globals = tester.ctx.network_globals.clone().unwrap();
let publication_result = publish_block(
None,
ProvenancedBlock::local(gossip_block_contents_b.unwrap()),
ProvenancedBlock::local(gossip_block_b.unwrap(), blobs_b),
tester.harness.chain,
&channel.0,
test_logger,
@@ -686,8 +684,8 @@ pub async fn equivocation_consensus_late_equivocation() {
assert!(publication_error.find::<CustomBadRequest>().is_some());
assert_eq!(
*publication_error.find::<CustomBadRequest>().unwrap().0,
"proposal for this slot and proposer has already been seen".to_string()
publication_error.find::<CustomBadRequest>().unwrap().0,
"proposal for this slot and proposer has already been seen"
);
}
@@ -783,7 +781,7 @@ pub async fn blinded_gossip_invalid() {
/* mandated by Beacon API spec */
assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST));
assert_server_message_error(error_response, "BAD_REQUEST: BlockError(NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 })".to_string());
assert_server_message_error(error_response, "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string());
}
/// This test checks that a block that is valid from a gossip perspective is accepted when using `broadcast_validation=gossip`.
@@ -961,7 +959,7 @@ pub async fn blinded_consensus_invalid() {
/* mandated by Beacon API spec */
assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST));
assert_server_message_error(error_response, "BAD_REQUEST: BlockError(NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 })".to_string());
assert_server_message_error(error_response, "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string());
}
/// This test checks that a block that is only valid from a gossip perspective is rejected when using `broadcast_validation=consensus`.
@@ -1099,7 +1097,7 @@ pub async fn blinded_equivocation_invalid() {
/* mandated by Beacon API spec */
assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST));
assert_server_message_error(error_response, "BAD_REQUEST: BlockError(NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 })".to_string());
assert_server_message_error(error_response, "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string());
}
/// This test checks that a block that is valid from both a gossip and consensus perspective is rejected when using `broadcast_validation=consensus_and_equivocation`.
@@ -1169,10 +1167,7 @@ pub async fn blinded_equivocation_consensus_early_equivocation() {
let error_response: eth2::Error = response.err().unwrap();
assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST));
assert_server_message_error(
error_response,
"BAD_REQUEST: BlockError(Slashable)".to_string(),
);
assert_server_message_error(error_response, "BAD_REQUEST: Slashable".to_string());
}
/// This test checks that a block that is only valid from a gossip perspective is rejected when using `broadcast_validation=consensus_and_equivocation`.
@@ -1295,19 +1290,17 @@ pub async fn blinded_equivocation_consensus_late_equivocation() {
.unwrap();
let inner_block_a = match unblinded_block_a {
ProvenancedBlock::Local(a, _) => a,
ProvenancedBlock::Builder(a, _) => a,
ProvenancedBlock::Local(a, _, _) => a,
ProvenancedBlock::Builder(a, _, _) => a,
};
let inner_block_b = match unblinded_block_b {
ProvenancedBlock::Local(b, _) => b,
ProvenancedBlock::Builder(b, _) => b,
ProvenancedBlock::Local(b, _, _) => b,
ProvenancedBlock::Builder(b, _, _) => b,
};
let gossip_block_b =
GossipVerifiedBlock::new(inner_block_b.clone().deconstruct().0, &tester.harness.chain);
let gossip_block_b = GossipVerifiedBlock::new(inner_block_b, &tester.harness.chain);
assert!(gossip_block_b.is_ok());
let gossip_block_a =
GossipVerifiedBlock::new(inner_block_a.clone().deconstruct().0, &tester.harness.chain);
let gossip_block_a = GossipVerifiedBlock::new(inner_block_a, &tester.harness.chain);
assert!(gossip_block_a.is_err());
let channel = tokio::sync::mpsc::unbounded_channel();
@@ -1374,6 +1367,438 @@ pub async fn blinded_equivocation_full_pass() {
.block_is_known_to_fork_choice(&block.canonical_root()));
}
/// This test checks that an HTTP POST request with the block & blobs succeeds with a 200 response
/// even if the block has already been seen on gossip without any blobs.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn block_seen_on_gossip_without_blobs() {
let validation_level: Option<BroadcastValidation> = Some(BroadcastValidation::Gossip);
// Validator count needs to be at least 32 or proposer boost gets set to 0 when computing
// `validator_count // 32`.
let validator_count = 64;
let num_initial: u64 = 31;
let spec = ForkName::latest().make_genesis_spec(E::default_spec());
let tester = InteractiveTester::<E>::new(Some(spec), validator_count).await;
// Create some chain depth.
tester.harness.advance_slot();
tester
.harness
.extend_chain(
num_initial as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
tester.harness.advance_slot();
let slot_a = Slot::new(num_initial);
let slot_b = slot_a + 1;
let state_a = tester.harness.get_current_state();
let ((block, blobs), _) = tester.harness.make_block(state_a, slot_b).await;
let blobs = blobs.expect("should have some blobs");
assert_ne!(blobs.0.len(), 0);
// Simulate the block being seen on gossip.
block
.clone()
.into_gossip_verified_block(&tester.harness.chain)
.unwrap();
// It should not yet be added to fork choice because blobs have not been seen.
assert!(!tester
.harness
.chain
.block_is_known_to_fork_choice(&block.canonical_root()));
// Post the block *and* blobs to the HTTP API.
let response: Result<(), eth2::Error> = tester
.client
.post_beacon_blocks_v2(
&PublishBlockRequest::new(block.clone(), Some(blobs)),
validation_level,
)
.await;
// This should result in the block being fully imported.
response.unwrap();
assert!(tester
.harness
.chain
.block_is_known_to_fork_choice(&block.canonical_root()));
}
/// This test checks that an HTTP POST request with the block & blobs succeeds with a 200 response
/// even if the block has already been seen on gossip without all blobs.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn block_seen_on_gossip_with_some_blobs() {
let validation_level: Option<BroadcastValidation> = Some(BroadcastValidation::Gossip);
// Validator count needs to be at least 32 or proposer boost gets set to 0 when computing
// `validator_count // 32`.
let validator_count = 64;
let num_initial: u64 = 31;
let spec = ForkName::latest().make_genesis_spec(E::default_spec());
let tester = InteractiveTester::<E>::new(Some(spec), validator_count).await;
// Create some chain depth.
tester.harness.advance_slot();
tester
.harness
.extend_chain(
num_initial as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
tester.harness.advance_slot();
let slot_a = Slot::new(num_initial);
let slot_b = slot_a + 1;
let state_a = tester.harness.get_current_state();
let ((block, blobs), _) = tester.harness.make_block(state_a, slot_b).await;
let blobs = blobs.expect("should have some blobs");
assert!(
blobs.0.len() >= 2,
"need at least 2 blobs for partial reveal"
);
let partial_kzg_proofs = vec![blobs.0.get(0).unwrap().clone()];
let partial_blobs = vec![blobs.1.get(0).unwrap().clone()];
// Simulate the block being seen on gossip.
block
.clone()
.into_gossip_verified_block(&tester.harness.chain)
.unwrap();
// Simulate some of the blobs being seen on gossip.
for (i, (kzg_proof, blob)) in partial_kzg_proofs
.into_iter()
.zip(partial_blobs)
.enumerate()
{
let sidecar = Arc::new(BlobSidecar::new(i, blob, &block, kzg_proof).unwrap());
let gossip_blob =
GossipVerifiedBlob::new(sidecar, i as u64, &tester.harness.chain).unwrap();
tester
.harness
.chain
.process_gossip_blob(gossip_blob, || panic!("should not publish block yet"))
.await
.unwrap();
}
// It should not yet be added to fork choice because all blobs have not been seen.
assert!(!tester
.harness
.chain
.block_is_known_to_fork_choice(&block.canonical_root()));
// Post the block *and* all blobs to the HTTP API.
let response: Result<(), eth2::Error> = tester
.client
.post_beacon_blocks_v2(
&PublishBlockRequest::new(block.clone(), Some(blobs)),
validation_level,
)
.await;
// This should result in the block being fully imported.
response.unwrap();
assert!(tester
.harness
.chain
.block_is_known_to_fork_choice(&block.canonical_root()));
}
/// This test checks that an HTTP POST request with the block & blobs succeeds with a 200 response
/// even if the blobs have already been seen on gossip.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn blobs_seen_on_gossip_without_block() {
let validation_level: Option<BroadcastValidation> = Some(BroadcastValidation::Gossip);
// Validator count needs to be at least 32 or proposer boost gets set to 0 when computing
// `validator_count // 32`.
let validator_count = 64;
let num_initial: u64 = 31;
let spec = ForkName::latest().make_genesis_spec(E::default_spec());
let tester = InteractiveTester::<E>::new(Some(spec), validator_count).await;
// Create some chain depth.
tester.harness.advance_slot();
tester
.harness
.extend_chain(
num_initial as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
tester.harness.advance_slot();
let slot_a = Slot::new(num_initial);
let slot_b = slot_a + 1;
let state_a = tester.harness.get_current_state();
let ((block, blobs), _) = tester.harness.make_block(state_a, slot_b).await;
let (kzg_proofs, blobs) = blobs.expect("should have some blobs");
// Simulate the blobs being seen on gossip.
for (i, (kzg_proof, blob)) in kzg_proofs
.clone()
.into_iter()
.zip(blobs.clone())
.enumerate()
{
let sidecar = Arc::new(BlobSidecar::new(i, blob, &block, kzg_proof).unwrap());
let gossip_blob =
GossipVerifiedBlob::new(sidecar, i as u64, &tester.harness.chain).unwrap();
tester
.harness
.chain
.process_gossip_blob(gossip_blob, || panic!("should not publish block yet"))
.await
.unwrap();
}
// It should not yet be added to fork choice because the block has not been seen.
assert!(!tester
.harness
.chain
.block_is_known_to_fork_choice(&block.canonical_root()));
// Post the block *and* all blobs to the HTTP API.
let response: Result<(), eth2::Error> = tester
.client
.post_beacon_blocks_v2(
&PublishBlockRequest::new(block.clone(), Some((kzg_proofs, blobs))),
validation_level,
)
.await;
// This should result in the block being fully imported.
response.unwrap();
assert!(tester
.harness
.chain
.block_is_known_to_fork_choice(&block.canonical_root()));
}
/// This test checks that an HTTP POST request with the block succeeds with a 200 response
/// if just the blobs have already been seen on gossip.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn blobs_seen_on_gossip_without_block_and_no_http_blobs() {
let validation_level: Option<BroadcastValidation> = Some(BroadcastValidation::Gossip);
// Validator count needs to be at least 32 or proposer boost gets set to 0 when computing
// `validator_count // 32`.
let validator_count = 64;
let num_initial: u64 = 31;
let spec = ForkName::latest().make_genesis_spec(E::default_spec());
let tester = InteractiveTester::<E>::new(Some(spec), validator_count).await;
// Create some chain depth.
tester.harness.advance_slot();
tester
.harness
.extend_chain(
num_initial as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
tester.harness.advance_slot();
let slot_a = Slot::new(num_initial);
let slot_b = slot_a + 1;
let state_a = tester.harness.get_current_state();
let ((block, blobs), _) = tester.harness.make_block(state_a, slot_b).await;
let (kzg_proofs, blobs) = blobs.expect("should have some blobs");
assert!(!blobs.is_empty());
// Simulate the blobs being seen on gossip.
for (i, (kzg_proof, blob)) in kzg_proofs
.clone()
.into_iter()
.zip(blobs.clone())
.enumerate()
{
let sidecar = Arc::new(BlobSidecar::new(i, blob, &block, kzg_proof).unwrap());
let gossip_blob =
GossipVerifiedBlob::new(sidecar, i as u64, &tester.harness.chain).unwrap();
tester
.harness
.chain
.process_gossip_blob(gossip_blob, || panic!("should not publish block yet"))
.await
.unwrap();
}
// It should not yet be added to fork choice because the block has not been seen.
assert!(!tester
.harness
.chain
.block_is_known_to_fork_choice(&block.canonical_root()));
// Post just the block to the HTTP API (blob lists are empty).
let response: Result<(), eth2::Error> = tester
.client
.post_beacon_blocks_v2(
&PublishBlockRequest::new(
block.clone(),
Some((Default::default(), Default::default())),
),
validation_level,
)
.await;
// This should result in the block being fully imported.
response.unwrap();
assert!(tester
.harness
.chain
.block_is_known_to_fork_choice(&block.canonical_root()));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn slashable_blobs_seen_on_gossip_cause_failure() {
let validation_level: Option<BroadcastValidation> =
Some(BroadcastValidation::ConsensusAndEquivocation);
// Validator count needs to be at least 32 or proposer boost gets set to 0 when computing
// `validator_count // 32`.
let validator_count = 64;
let num_initial: u64 = 31;
let spec = ForkName::latest().make_genesis_spec(E::default_spec());
let tester = InteractiveTester::<E>::new(Some(spec), validator_count).await;
// Create some chain depth.
tester.harness.advance_slot();
tester
.harness
.extend_chain(
num_initial as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
tester.harness.advance_slot();
let slot_a = Slot::new(num_initial);
let slot_b = slot_a + 1;
let state_a = tester.harness.get_current_state();
let ((block_a, blobs_a), _) = tester.harness.make_block(state_a.clone(), slot_b).await;
let ((block_b, blobs_b), _) = tester.harness.make_block(state_a, slot_b).await;
let (kzg_proofs_a, blobs_a) = blobs_a.expect("should have some blobs");
let (kzg_proofs_b, blobs_b) = blobs_b.expect("should have some blobs");
// Simulate the blobs of block B being seen on gossip.
for (i, (kzg_proof, blob)) in kzg_proofs_b.into_iter().zip(blobs_b).enumerate() {
let sidecar = Arc::new(BlobSidecar::new(i, blob, &block_b, kzg_proof).unwrap());
let gossip_blob =
GossipVerifiedBlob::new(sidecar, i as u64, &tester.harness.chain).unwrap();
tester
.harness
.chain
.process_gossip_blob(gossip_blob, || panic!("should not publish block yet"))
.await
.unwrap();
}
// It should not yet be added to fork choice because block B has not been seen.
assert!(!tester
.harness
.chain
.block_is_known_to_fork_choice(&block_b.canonical_root()));
// Post block A *and* all its blobs to the HTTP API.
let response: Result<(), eth2::Error> = tester
.client
.post_beacon_blocks_v2(
&PublishBlockRequest::new(block_a.clone(), Some((kzg_proofs_a, blobs_a))),
validation_level,
)
.await;
// This should not result in block A being fully imported.
response.unwrap_err();
assert!(!tester
.harness
.chain
.block_is_known_to_fork_choice(&block_a.canonical_root()));
}
/// This test checks that an HTTP POST request with a duplicate block & blobs results in the
/// `duplicate_status_code` being returned.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn duplicate_block_status_code() {
let validation_level: Option<BroadcastValidation> = Some(BroadcastValidation::Gossip);
// Validator count needs to be at least 32 or proposer boost gets set to 0 when computing
// `validator_count // 32`.
let validator_count = 64;
let num_initial: u64 = 31;
let spec = ForkName::latest().make_genesis_spec(E::default_spec());
let duplicate_block_status_code = StatusCode::IM_A_TEAPOT;
let tester = InteractiveTester::<E>::new_with_initializer_and_mutator(
Some(spec),
validator_count,
None,
None,
Config {
duplicate_block_status_code,
..Config::default()
},
)
.await;
// Create some chain depth.
tester.harness.advance_slot();
tester
.harness
.extend_chain(
num_initial as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
tester.harness.advance_slot();
let slot_a = Slot::new(num_initial);
let slot_b = slot_a + 1;
let state_a = tester.harness.get_current_state();
let ((block, blobs), _) = tester.harness.make_block(state_a, slot_b).await;
let (kzg_proofs, blobs) = blobs.expect("should have some blobs");
// Post the block blobs to the HTTP API once.
let block_request = PublishBlockRequest::new(block.clone(), Some((kzg_proofs, blobs)));
let response: Result<(), eth2::Error> = tester
.client
.post_beacon_blocks_v2(&block_request, validation_level)
.await;
// This should result in the block being fully imported.
response.unwrap();
assert!(tester
.harness
.chain
.block_is_known_to_fork_choice(&block.canonical_root()));
// Post again.
let duplicate_response: Result<(), eth2::Error> = tester
.client
.post_beacon_blocks_v2(&block_request, validation_level)
.await;
let err = duplicate_response.unwrap_err();
assert_eq!(err.status().unwrap(), duplicate_block_status_code);
}
fn assert_server_message_error(error_response: eth2::Error, expected_message: String) {
let eth2::Error::ServerMessage(err) = error_response else {
panic!("Not a eth2::Error::ServerMessage");

View File

@@ -386,6 +386,7 @@ async fn bls_to_execution_changes_update_all_around_capella_fork() {
.genesis_state_ephemeral_store(genesis_state)
})),
None,
Default::default(),
)
.await;
let harness = &tester.harness;

View File

@@ -72,6 +72,7 @@ async fn state_by_root_pruned_from_fork_choice() {
})
})),
None,
Default::default(),
)
.await;
@@ -427,6 +428,7 @@ pub async fn proposer_boost_re_org_test(
DisallowedReOrgOffsets::new::<E>(disallowed_offsets).unwrap(),
)
})),
Default::default(),
)
.await;
let harness = &tester.harness;

View File

@@ -936,7 +936,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let blob_slot = verified_blob.slot();
let blob_index = verified_blob.id().index;
let result = self.chain.process_gossip_blob(verified_blob).await;
let result = self
.chain
.process_gossip_blob(verified_blob, || Ok(()))
.await;
match &result {
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
@@ -963,7 +966,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"block_root" => %block_root,
);
}
Err(BlockError::BlockIsAlreadyKnown(_)) => {
Err(BlockError::DuplicateFullyImported(_)) => {
debug!(
self.log,
"Ignoring gossip blob already imported";
@@ -1013,7 +1016,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
match self
.chain
.process_gossip_data_columns(vec![verified_data_column])
.process_gossip_data_columns(vec![verified_data_column], || Ok(()))
.await
{
Ok((availability, data_columns_to_publish)) => {
@@ -1050,7 +1053,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
}
Err(BlockError::BlockIsAlreadyKnown(_)) => {
Err(BlockError::DuplicateFullyImported(_)) => {
debug!(
self.log,
"Ignoring gossip column already imported";
@@ -1242,7 +1245,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return None;
}
Err(BlockError::BlockIsAlreadyKnown(_)) => {
Err(
BlockError::DuplicateFullyImported(_)
| BlockError::DuplicateImportStatusUnknown(..),
) => {
debug!(
self.log,
"Gossip block is already known";

View File

@@ -294,7 +294,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"slot" => %slot,
);
}
Err(BlockError::BlockIsAlreadyKnown(_)) => {
Err(BlockError::DuplicateFullyImported(_)) => {
debug!(
self.log,
"Blobs have already been imported";
@@ -355,7 +355,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
}
Err(BlockError::BlockIsAlreadyKnown(_)) => {
Err(BlockError::DuplicateFullyImported(_)) => {
debug!(
self.log,
"Custody columns have already been imported";
@@ -715,7 +715,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
peer_action: Some(PeerAction::LowToleranceError),
})
}
BlockError::BlockIsAlreadyKnown(_) => {
BlockError::DuplicateFullyImported(_)
| BlockError::DuplicateImportStatusUnknown(..) => {
// This can happen for many reasons. Head sync's can download multiples and parent
// lookups can download blocks before range sync
Ok(())

View File

@@ -517,7 +517,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let action = match result {
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_))
| BlockProcessingResult::Err(BlockError::BlockIsAlreadyKnown(_)) => {
| BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..)) => {
// Successfully imported
request_state.on_processing_success()?;
Action::Continue
@@ -541,6 +541,16 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Action::Retry
}
}
BlockProcessingResult::Err(BlockError::DuplicateImportStatusUnknown(..)) => {
// This is unreachable because RPC blocks do not undergo gossip verification, and
// this error can *only* come from gossip verification.
error!(
self.log,
"Single block lookup hit unreachable condition";
"block_root" => ?block_root
);
Action::Drop
}
BlockProcessingResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
// This implies that the cpu is overloaded. Drop the request.

View File

@@ -1471,7 +1471,7 @@ fn test_parent_lookup_happy_path() {
// Processing succeeds, now the rest of the chain should be sent for processing.
rig.parent_block_processed(
block_root,
BlockError::BlockIsAlreadyKnown(block_root).into(),
BlockError::DuplicateFullyImported(block_root).into(),
);
rig.expect_parent_chain_process();
rig.parent_chain_processed_success(block_root, &[]);
@@ -1839,7 +1839,7 @@ fn test_same_chain_race_condition() {
rig.log(&format!("Block {i} was removed and is already known"));
rig.parent_block_processed(
chain_hash,
BlockError::BlockIsAlreadyKnown(block.canonical_root()).into(),
BlockError::DuplicateFullyImported(block.canonical_root()).into(),
)
} else {
rig.log(&format!("Block {i} ParentUnknown"));

View File

@@ -629,8 +629,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// cache nor in the request state of this lookup. Therefore, the block must either: (1) not
// be downloaded yet or (2) the block is already imported into the fork-choice.
// In case (1) the lookup must either successfully download the block or get dropped.
// In case (2) the block will be downloaded, processed, reach `BlockIsAlreadyKnown` and
// get dropped as completed.
// In case (2) the block will be downloaded, processed, reach `DuplicateFullyImported`
// and get dropped as completed.
return Ok(LookupRequestResult::Pending("waiting for block download"));
};
let expected_blobs = block.num_expected_blobs();

View File

@@ -1883,42 +1883,6 @@ impl<E: EthSpec> PublishBlockRequest<E> {
}
}
/// Converting from a `SignedBlindedBeaconBlock` into a full `SignedBlockContents`.
pub fn into_full_block_and_blobs<E: EthSpec>(
blinded_block: SignedBlindedBeaconBlock<E>,
maybe_full_payload_contents: Option<FullPayloadContents<E>>,
) -> Result<PublishBlockRequest<E>, String> {
match maybe_full_payload_contents {
None => {
let signed_block = blinded_block
.try_into_full_block(None)
.ok_or("Failed to build full block with payload".to_string())?;
Ok(PublishBlockRequest::new(Arc::new(signed_block), None))
}
// This variant implies a pre-deneb block
Some(FullPayloadContents::Payload(execution_payload)) => {
let signed_block = blinded_block
.try_into_full_block(Some(execution_payload))
.ok_or("Failed to build full block with payload".to_string())?;
Ok(PublishBlockRequest::new(Arc::new(signed_block), None))
}
// This variant implies a post-deneb block
Some(FullPayloadContents::PayloadAndBlobs(payload_and_blobs)) => {
let signed_block = blinded_block
.try_into_full_block(Some(payload_and_blobs.execution_payload))
.ok_or("Failed to build full block with payload".to_string())?;
Ok(PublishBlockRequest::new(
Arc::new(signed_block),
Some((
payload_and_blobs.blobs_bundle.proofs,
payload_and_blobs.blobs_bundle.blobs,
)),
))
}
}
}
impl<E: EthSpec> TryFrom<Arc<SignedBeaconBlock<E>>> for PublishBlockRequest<E> {
type Error = &'static str;
fn try_from(block: Arc<SignedBeaconBlock<E>>) -> Result<Self, Self::Error> {

View File

@@ -161,6 +161,7 @@ pub enum DataColumnSidecarError {
DataColumnIndexOutOfBounds,
KzgCommitmentInclusionProofOutOfBounds,
KzgError(KzgError),
KzgNotInitialized,
MissingBlobSidecars,
PreDeneb,
SszError(SszError),

View File

@@ -505,8 +505,8 @@ impl<E: EthSpec> Tester<E> {
}
Err(_) => GossipVerifiedBlob::__assumed_valid(blob_sidecar),
};
let result =
self.block_on_dangerous(self.harness.chain.process_gossip_blob(blob))?;
let result = self
.block_on_dangerous(self.harness.chain.process_gossip_blob(blob, || Ok(())))?;
if valid {
assert!(result.is_ok());
}