mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-30 12:47:05 +00:00
Fix
This commit is contained in:
@@ -3185,7 +3185,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
};
|
||||
|
||||
// Import the blocks into the chain.
|
||||
for signature_verified_block in signature_verified_blocks {
|
||||
for (signature_verified_block, _envelope) in signature_verified_blocks {
|
||||
let block_slot = signature_verified_block.slot();
|
||||
match self
|
||||
.process_block(
|
||||
|
||||
@@ -60,6 +60,7 @@ use crate::execution_payload::{
|
||||
};
|
||||
use crate::kzg_utils::blobs_to_data_column_sidecars;
|
||||
use crate::observed_block_producers::SeenBlock;
|
||||
use crate::payload_envelope_verification::{AvailableEnvelope, EnvelopeError};
|
||||
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
|
||||
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
|
||||
use crate::{
|
||||
@@ -324,6 +325,20 @@ pub enum BlockError {
|
||||
bid_parent_root: Hash256,
|
||||
block_parent_root: Hash256,
|
||||
},
|
||||
/// The child block is known but its parent execution payload envelope has not been received yet.
|
||||
///
|
||||
/// ## Peer scoring
|
||||
///
|
||||
/// It's unclear if this block is valid, but it cannot be fully verified without the parent's
|
||||
/// execution payload envelope.
|
||||
ParentEnvelopeUnknown { parent_root: Hash256 },
|
||||
/// An error occurred while processing the execution payload envelope during range sync.
|
||||
EnvelopeError(Box<EnvelopeError>),
|
||||
|
||||
PayloadEnvelopeError {
|
||||
e: Box<EnvelopeError>,
|
||||
penalize_peer: bool,
|
||||
},
|
||||
}
|
||||
|
||||
/// Which specific signature(s) are invalid in a SignedBeaconBlock
|
||||
@@ -490,6 +505,33 @@ impl From<ArithError> for BlockError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<EnvelopeError> for BlockError {
|
||||
fn from(e: EnvelopeError) -> Self {
|
||||
let penalize_peer = match &e {
|
||||
// REJECT per spec: peer sent invalid envelope data
|
||||
EnvelopeError::BadSignature
|
||||
| EnvelopeError::BuilderIndexMismatch { .. }
|
||||
| EnvelopeError::BlockHashMismatch { .. }
|
||||
| EnvelopeError::SlotMismatch { .. }
|
||||
| EnvelopeError::IncorrectBlockProposer { .. } => true,
|
||||
// IGNORE per spec: not the peer's fault
|
||||
EnvelopeError::BlockRootUnknown { .. }
|
||||
| EnvelopeError::PriorToFinalization { .. }
|
||||
| EnvelopeError::UnknownValidator { .. } => false,
|
||||
// Internal errors: not the peer's fault
|
||||
EnvelopeError::BeaconChainError(_)
|
||||
| EnvelopeError::BeaconStateError(_)
|
||||
| EnvelopeError::EnvelopeProcessingError(_)
|
||||
| EnvelopeError::ExecutionPayloadError(_)
|
||||
| EnvelopeError::ImportError(_) => false,
|
||||
};
|
||||
BlockError::PayloadEnvelopeError {
|
||||
e: Box::new(e),
|
||||
penalize_peer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores information about verifying a payload against an execution engine.
|
||||
#[derive(Debug, PartialEq, Clone, Encode, Decode)]
|
||||
pub struct PayloadVerificationOutcome {
|
||||
@@ -587,10 +629,17 @@ pub(crate) fn process_block_slash_info<T: BeaconChainTypes, TErr: BlockBlobError
|
||||
/// The given `chain_segment` must contain only blocks from the same epoch, otherwise an error
|
||||
/// will be returned.
|
||||
#[instrument(skip_all)]
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
|
||||
mut chain_segment: Vec<(Hash256, RangeSyncBlock<T::EthSpec>)>,
|
||||
chain: &BeaconChain<T>,
|
||||
) -> Result<Vec<SignatureVerifiedBlock<T>>, BlockError> {
|
||||
) -> Result<
|
||||
Vec<(
|
||||
SignatureVerifiedBlock<T>,
|
||||
Option<Box<AvailableEnvelope<T::EthSpec>>>,
|
||||
)>,
|
||||
BlockError,
|
||||
> {
|
||||
if chain_segment.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
@@ -619,14 +668,29 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
|
||||
let consensus_context =
|
||||
ConsensusContext::new(block.slot()).set_current_block_root(block_root);
|
||||
|
||||
let available_block = block.into_available_block();
|
||||
let (available_block, envelope) = match block {
|
||||
RangeSyncBlock::Base(ab) => (ab, None),
|
||||
RangeSyncBlock::Gloas { block, envelope } => {
|
||||
let ab = AvailableBlock::new(
|
||||
block,
|
||||
AvailableBlockData::NoData,
|
||||
&chain.data_availability_checker,
|
||||
chain.spec.clone(),
|
||||
)
|
||||
.map_err(BlockError::AvailabilityCheck)?;
|
||||
(ab, envelope)
|
||||
}
|
||||
};
|
||||
available_blocks.push(available_block.clone());
|
||||
signature_verified_blocks.push(SignatureVerifiedBlock {
|
||||
signature_verified_blocks.push((
|
||||
SignatureVerifiedBlock {
|
||||
block: MaybeAvailableBlock::Available(available_block),
|
||||
block_root,
|
||||
parent: None,
|
||||
consensus_context,
|
||||
});
|
||||
},
|
||||
envelope,
|
||||
));
|
||||
}
|
||||
// TODO(gloas) When implementing range and backfill sync for gloas
|
||||
// we need a batch verify kzg function in the new da checker as well.
|
||||
@@ -637,7 +701,7 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
|
||||
// verify signatures
|
||||
let pubkey_cache = get_validator_pubkey_cache(chain)?;
|
||||
let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);
|
||||
for svb in &mut signature_verified_blocks {
|
||||
for (svb, _) in &mut signature_verified_blocks {
|
||||
signature_verifier
|
||||
.include_all_signatures(svb.block.as_block(), &mut svb.consensus_context)?;
|
||||
}
|
||||
@@ -648,7 +712,7 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
|
||||
|
||||
drop(pubkey_cache);
|
||||
|
||||
if let Some(signature_verified_block) = signature_verified_blocks.first_mut() {
|
||||
if let Some((signature_verified_block, _)) = signature_verified_blocks.first_mut() {
|
||||
signature_verified_block.parent = Some(parent);
|
||||
}
|
||||
|
||||
@@ -1196,7 +1260,7 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
|
||||
let result = info_span!("signature_verify").in_scope(|| signature_verifier.verify());
|
||||
match result {
|
||||
Ok(_) => {
|
||||
// gloas blocks are always available.
|
||||
// Gloas blocks are always available — data arrives via the envelope.
|
||||
let maybe_available = if chain
|
||||
.spec
|
||||
.fork_name_at_slot::<T::EthSpec>(block.slot())
|
||||
@@ -1951,6 +2015,21 @@ fn load_parent<T: BeaconChainTypes, B: AsBlock<T::EthSpec>>(
|
||||
BlockError::from(BeaconChainError::MissingBeaconBlock(block.parent_root()))
|
||||
})?;
|
||||
|
||||
// For post-Gloas parent blocks, the execution payload arrives via the envelope.
|
||||
// If the parent's execution payload envelope hasn't arrived yet,
|
||||
// return an unknown parent error so the block gets sent to the
|
||||
// reprocess queue.
|
||||
if chain
|
||||
.spec
|
||||
.fork_name_at_slot::<T::EthSpec>(parent_block.slot())
|
||||
.gloas_enabled()
|
||||
{
|
||||
let _envelope = chain
|
||||
.store
|
||||
.get_payload_envelope(&root)?
|
||||
.ok_or(BlockError::ParentEnvelopeUnknown { parent_root: root })?;
|
||||
}
|
||||
|
||||
// Load the parent block's state from the database, returning an error if it is not found.
|
||||
// It is an error because if we know the parent block we should also know the parent state.
|
||||
// Retrieve any state that is advanced through to at most `block.slot()`: this is
|
||||
|
||||
@@ -2,10 +2,11 @@ use crate::data_availability_checker::{AvailabilityCheckError, DataAvailabilityC
|
||||
pub use crate::data_availability_checker::{
|
||||
AvailableBlock, AvailableBlockData, MaybeAvailableBlock,
|
||||
};
|
||||
use crate::payload_envelope_verification::AvailableEnvelope;
|
||||
use crate::{BeaconChainTypes, PayloadVerificationOutcome};
|
||||
use educe::Educe;
|
||||
use state_processing::ConsensusContext;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::sync::Arc;
|
||||
use types::data::BlobIdentifier;
|
||||
use types::{
|
||||
@@ -45,38 +46,61 @@ impl<E: EthSpec> LookupBlock<E> {
|
||||
/// This includes any and all blobs/columns required, including zero if
|
||||
/// none are required. This can happen if the block is pre-deneb or if
|
||||
/// it's simply past the DA boundary.
|
||||
#[derive(Clone, Educe)]
|
||||
#[educe(Hash(bound(E: EthSpec)))]
|
||||
pub struct RangeSyncBlock<E: EthSpec> {
|
||||
block: AvailableBlock<E>,
|
||||
#[derive(Clone)]
|
||||
pub enum RangeSyncBlock<E: EthSpec> {
|
||||
Base(AvailableBlock<E>),
|
||||
Gloas {
|
||||
block: Arc<SignedBeaconBlock<E>>,
|
||||
envelope: Option<Box<AvailableEnvelope<E>>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<E: EthSpec> Hash for RangeSyncBlock<E> {
|
||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||
self.block_root().hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> Debug for RangeSyncBlock<E> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "RpcBlock({:?})", self.block_root())
|
||||
write!(f, "RangeSyncBlock({:?})", self.block_root())
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> RangeSyncBlock<E> {
|
||||
pub fn block_root(&self) -> Hash256 {
|
||||
self.block.block_root()
|
||||
match self {
|
||||
RangeSyncBlock::Base(block) => block.block_root(),
|
||||
RangeSyncBlock::Gloas { block, .. } => block.canonical_root(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_block(&self) -> &SignedBeaconBlock<E> {
|
||||
self.block.block()
|
||||
match self {
|
||||
RangeSyncBlock::Base(block) => block.block(),
|
||||
RangeSyncBlock::Gloas { block, .. } => block,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
|
||||
self.block.block_cloned()
|
||||
match self {
|
||||
RangeSyncBlock::Base(block) => block.block_cloned(),
|
||||
RangeSyncBlock::Gloas { block, .. } => block.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn block_data(&self) -> &AvailableBlockData<E> {
|
||||
self.block.data()
|
||||
match self {
|
||||
RangeSyncBlock::Base(block) => block.data(),
|
||||
RangeSyncBlock::Gloas { .. } => {
|
||||
unreachable!("block_data called on Gloas variant — use envelope data instead")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> RangeSyncBlock<E> {
|
||||
/// Constructs an `RangeSyncBlock` from a block and availability data.
|
||||
/// Constructs a `RangeSyncBlock` from a block and availability data.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
@@ -94,32 +118,53 @@ impl<E: EthSpec> RangeSyncBlock<E> {
|
||||
T: BeaconChainTypes<EthSpec = E>,
|
||||
{
|
||||
let available_block = AvailableBlock::new(block, block_data, da_checker, spec)?;
|
||||
Ok(Self {
|
||||
block: available_block,
|
||||
})
|
||||
Ok(Self::Base(available_block))
|
||||
}
|
||||
|
||||
pub fn new_gloas(
|
||||
block: Arc<SignedBeaconBlock<E>>,
|
||||
envelope: Option<Box<AvailableEnvelope<E>>>,
|
||||
) -> Self {
|
||||
Self::Gloas { block, envelope }
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn deconstruct(self) -> (Hash256, Arc<SignedBeaconBlock<E>>, AvailableBlockData<E>) {
|
||||
self.block.deconstruct()
|
||||
match self {
|
||||
RangeSyncBlock::Base(block) => block.deconstruct(),
|
||||
RangeSyncBlock::Gloas { .. } => {
|
||||
unreachable!("deconstruct called on Gloas variant")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn n_blobs(&self) -> usize {
|
||||
match self.block_data() {
|
||||
match self {
|
||||
RangeSyncBlock::Base(block) => match block.data() {
|
||||
AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0,
|
||||
AvailableBlockData::Blobs(blobs) => blobs.len(),
|
||||
},
|
||||
RangeSyncBlock::Gloas { .. } => 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn n_data_columns(&self) -> usize {
|
||||
match self.block_data() {
|
||||
match self {
|
||||
RangeSyncBlock::Base(block) => match block.data() {
|
||||
AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0,
|
||||
AvailableBlockData::DataColumns(columns) => columns.len(),
|
||||
},
|
||||
RangeSyncBlock::Gloas { .. } => 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_available_block(self) -> AvailableBlock<E> {
|
||||
self.block
|
||||
match self {
|
||||
RangeSyncBlock::Base(block) => block,
|
||||
RangeSyncBlock::Gloas { .. } => {
|
||||
unreachable!("into_available_block called on Gloas variant")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -387,31 +432,31 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
|
||||
|
||||
impl<E: EthSpec> AsBlock<E> for RangeSyncBlock<E> {
|
||||
fn slot(&self) -> Slot {
|
||||
self.as_block().slot()
|
||||
RangeSyncBlock::as_block(self).slot()
|
||||
}
|
||||
fn epoch(&self) -> Epoch {
|
||||
self.as_block().epoch()
|
||||
RangeSyncBlock::as_block(self).epoch()
|
||||
}
|
||||
fn parent_root(&self) -> Hash256 {
|
||||
self.as_block().parent_root()
|
||||
RangeSyncBlock::as_block(self).parent_root()
|
||||
}
|
||||
fn state_root(&self) -> Hash256 {
|
||||
self.as_block().state_root()
|
||||
RangeSyncBlock::as_block(self).state_root()
|
||||
}
|
||||
fn signed_block_header(&self) -> SignedBeaconBlockHeader {
|
||||
self.as_block().signed_block_header()
|
||||
RangeSyncBlock::as_block(self).signed_block_header()
|
||||
}
|
||||
fn message(&self) -> BeaconBlockRef<'_, E> {
|
||||
self.as_block().message()
|
||||
RangeSyncBlock::as_block(self).message()
|
||||
}
|
||||
fn as_block(&self) -> &SignedBeaconBlock<E> {
|
||||
self.block.as_block()
|
||||
RangeSyncBlock::as_block(self)
|
||||
}
|
||||
fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
|
||||
self.block.block_cloned()
|
||||
RangeSyncBlock::block_cloned(self)
|
||||
}
|
||||
fn canonical_root(&self) -> Hash256 {
|
||||
self.block.block_root()
|
||||
self.block_root()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ use tracing::{debug, error, instrument};
|
||||
use types::data::{BlobIdentifier, FixedBlobSidecarList, PartialDataColumn};
|
||||
use types::{
|
||||
BlobSidecar, BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar,
|
||||
DataColumnSidecarList, Epoch, EthSpec, Hash256, PartialDataColumnSidecarError,
|
||||
DataColumnSidecarList, Epoch, EthSpec, ForkName, Hash256, PartialDataColumnSidecarError,
|
||||
PartialDataColumnSidecarRef, SignedBeaconBlock, Slot, new_non_zero_usize,
|
||||
};
|
||||
|
||||
@@ -539,6 +539,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch)
|
||||
}
|
||||
|
||||
/// Determines if execution payload envelopes are required for an epoch (Gloas and later).
|
||||
pub fn envelopes_required_for_epoch(&self, epoch: Epoch) -> bool {
|
||||
self.spec.fork_name_at_epoch(epoch) >= ForkName::Gloas
|
||||
}
|
||||
|
||||
/// See `Self::blobs_required_for_epoch`
|
||||
fn blobs_required_for_block(&self, block: &SignedBeaconBlock<T::EthSpec>) -> bool {
|
||||
block.num_expected_blobs() > 0 && self.blobs_required_for_epoch(block.epoch())
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
//! ```
|
||||
|
||||
use state_processing::envelope_processing::EnvelopeProcessingError;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use store::Error as DBError;
|
||||
use strum::AsRefStr;
|
||||
@@ -40,7 +41,15 @@ mod payload_notifier;
|
||||
|
||||
pub use execution_pending_envelope::ExecutionPendingEnvelope;
|
||||
|
||||
#[derive(Debug)]
|
||||
// TODO(gloas): could remove this type completely, or remove the generic
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct EnvelopeImportData<E: EthSpec> {
|
||||
pub block_root: Hash256,
|
||||
_phantom: PhantomData<E>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[allow(dead_code)]
|
||||
pub struct AvailableEnvelope<E: EthSpec> {
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
|
||||
pub columns: DataColumnSidecarList<E>,
|
||||
|
||||
@@ -417,6 +417,9 @@ pub enum Work<E: EthSpec> {
|
||||
RpcBlobs {
|
||||
process_fn: AsyncFn,
|
||||
},
|
||||
RpcPayloadEnvelope {
|
||||
process_fn: AsyncFn,
|
||||
},
|
||||
RpcCustodyColumn(AsyncFn),
|
||||
ColumnReconstruction(AsyncFn),
|
||||
IgnoredRpcBlock {
|
||||
@@ -483,6 +486,7 @@ pub enum WorkType {
|
||||
GossipLightClientOptimisticUpdate,
|
||||
RpcBlock,
|
||||
RpcBlobs,
|
||||
RpcPayloadEnvelope,
|
||||
RpcCustodyColumn,
|
||||
ColumnReconstruction,
|
||||
IgnoredRpcBlock,
|
||||
@@ -545,6 +549,7 @@ impl<E: EthSpec> Work<E> {
|
||||
Work::GossipProposerPreferences(_) => WorkType::GossipProposerPreferences,
|
||||
Work::RpcBlock { .. } => WorkType::RpcBlock,
|
||||
Work::RpcBlobs { .. } => WorkType::RpcBlobs,
|
||||
Work::RpcPayloadEnvelope { .. } => WorkType::RpcPayloadEnvelope,
|
||||
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
|
||||
Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
|
||||
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
|
||||
@@ -1183,7 +1188,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
Work::GossipLightClientOptimisticUpdate { .. } => work_queues
|
||||
.lc_gossip_optimistic_update_queue
|
||||
.push(work, work_id),
|
||||
Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => {
|
||||
Work::RpcBlock { .. }
|
||||
| Work::IgnoredRpcBlock { .. }
|
||||
| Work::RpcPayloadEnvelope { .. } => {
|
||||
work_queues.rpc_block_queue.push(work, work_id)
|
||||
}
|
||||
Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id),
|
||||
@@ -1318,7 +1325,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
WorkType::GossipLightClientOptimisticUpdate => {
|
||||
work_queues.lc_gossip_optimistic_update_queue.len()
|
||||
}
|
||||
WorkType::RpcBlock => work_queues.rpc_block_queue.len(),
|
||||
WorkType::RpcBlock | WorkType::RpcPayloadEnvelope => {
|
||||
work_queues.rpc_block_queue.len()
|
||||
}
|
||||
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => {
|
||||
work_queues.rpc_blob_queue.len()
|
||||
}
|
||||
@@ -1513,6 +1522,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
beacon_block_root: _,
|
||||
}
|
||||
| Work::RpcBlobs { process_fn }
|
||||
| Work::RpcPayloadEnvelope { process_fn }
|
||||
| Work::RpcCustodyColumn(process_fn)
|
||||
| Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
|
||||
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
|
||||
|
||||
@@ -31,6 +31,10 @@ pub enum SyncRequestId {
|
||||
BlobsByRange(BlobsByRangeRequestId),
|
||||
/// Data columns by range request
|
||||
DataColumnsByRange(DataColumnsByRangeRequestId),
|
||||
/// Request searching for an execution payload envelope given a block root.
|
||||
SinglePayloadEnvelope { id: SingleLookupReqId },
|
||||
/// Payload envelopes by range request
|
||||
PayloadEnvelopesByRange(PayloadEnvelopesByRangeRequestId),
|
||||
}
|
||||
|
||||
/// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly.
|
||||
@@ -76,6 +80,14 @@ pub enum DataColumnsByRangeRequester {
|
||||
CustodyBackfillSync(CustodyBackFillBatchRequestId),
|
||||
}
|
||||
|
||||
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PayloadEnvelopesByRangeRequestId {
|
||||
/// Id to identify this attempt at a payload_envelopes_by_range request for `parent_request_id`
|
||||
pub id: Id,
|
||||
/// The Id of the overall By Range request for block components.
|
||||
pub parent_request_id: ComponentsByRangeRequestId,
|
||||
}
|
||||
|
||||
/// Block components by range request for range sync. Includes an ID for downstream consumers to
|
||||
/// handle retries and tie all their sub requests together.
|
||||
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
|
||||
@@ -252,6 +264,12 @@ macro_rules! impl_display {
|
||||
impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id);
|
||||
impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id);
|
||||
impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id);
|
||||
impl_display!(
|
||||
PayloadEnvelopesByRangeRequestId,
|
||||
"{}/{}",
|
||||
id,
|
||||
parent_request_id
|
||||
);
|
||||
impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester);
|
||||
impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester);
|
||||
impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id);
|
||||
|
||||
@@ -1853,6 +1853,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
error!(error = %e, "Internal block gossip validation error");
|
||||
return None;
|
||||
}
|
||||
Err(BlockError::ParentEnvelopeUnknown { .. }) => {
|
||||
// Gossip validation does not check envelope availability; this should not occur.
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||
return None;
|
||||
}
|
||||
Err(e @ BlockError::EnvelopeError(_)) => {
|
||||
debug!(error = %e, "Gossip block envelope error");
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||
return None;
|
||||
}
|
||||
Err(e @ BlockError::PayloadEnvelopeError { .. }) => {
|
||||
debug!(error = %e, "Gossip block payload envelope error");
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);
|
||||
|
||||
@@ -33,6 +33,7 @@ pub type BatchId = Epoch;
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum ByRangeRequestType {
|
||||
BlocksAndColumns,
|
||||
BlocksAndEnvelopesAndColumns,
|
||||
BlocksAndBlobs,
|
||||
Blocks,
|
||||
Columns(HashSet<u64>),
|
||||
|
||||
@@ -4,11 +4,13 @@ use beacon_chain::{
|
||||
data_availability_checker::DataAvailabilityChecker,
|
||||
data_column_verification::CustodyDataColumn,
|
||||
get_block_root,
|
||||
payload_envelope_verification::AvailableEnvelope,
|
||||
};
|
||||
use lighthouse_network::{
|
||||
PeerId,
|
||||
service::api_types::{
|
||||
BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId,
|
||||
PayloadEnvelopesByRangeRequestId,
|
||||
},
|
||||
};
|
||||
use ssz_types::RuntimeVariableList;
|
||||
@@ -16,7 +18,7 @@ use std::{collections::HashMap, sync::Arc};
|
||||
use tracing::{Span, debug};
|
||||
use types::{
|
||||
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
|
||||
Hash256, SignedBeaconBlock,
|
||||
Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope,
|
||||
};
|
||||
|
||||
use crate::sync::network_context::MAX_COLUMN_RETRIES;
|
||||
@@ -35,6 +37,13 @@ use crate::sync::network_context::MAX_COLUMN_RETRIES;
|
||||
pub struct RangeBlockComponentsRequest<E: EthSpec> {
|
||||
/// Blocks we have received awaiting for their corresponding sidecar.
|
||||
blocks_request: ByRangeRequest<BlocksByRangeRequestId, Vec<Arc<SignedBeaconBlock<E>>>>,
|
||||
/// Payload envelopes (Gloas+). None for pre-Gloas forks.
|
||||
payloads_request: Option<
|
||||
ByRangeRequest<
|
||||
PayloadEnvelopesByRangeRequestId,
|
||||
Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
|
||||
>,
|
||||
>,
|
||||
/// Sidecars we have received awaiting for their corresponding block.
|
||||
block_data_request: RangeBlockDataRequest<E>,
|
||||
/// Span to track the range request and all children range requests.
|
||||
@@ -88,6 +97,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
Vec<(DataColumnsByRangeRequestId, Vec<ColumnIndex>)>,
|
||||
Vec<ColumnIndex>,
|
||||
)>,
|
||||
payloads_req_id: Option<PayloadEnvelopesByRangeRequestId>,
|
||||
request_span: Span,
|
||||
) -> Self {
|
||||
let block_data_request = if let Some(blobs_req_id) = blobs_req_id {
|
||||
@@ -109,6 +119,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
|
||||
Self {
|
||||
blocks_request: ByRangeRequest::Active(blocks_req_id),
|
||||
payloads_request: payloads_req_id.map(ByRangeRequest::Active),
|
||||
block_data_request,
|
||||
request_span,
|
||||
}
|
||||
@@ -191,6 +202,18 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds received payload envelopes to the request.
|
||||
pub fn add_payload_envelopes(
|
||||
&mut self,
|
||||
req_id: PayloadEnvelopesByRangeRequestId,
|
||||
envelopes: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
|
||||
) -> Result<(), String> {
|
||||
match &mut self.payloads_request {
|
||||
Some(req) => req.finish(req_id, envelopes),
|
||||
None => Err("received payload envelopes but none expected".to_owned()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to construct RPC blocks from all received components.
|
||||
///
|
||||
/// Returns `None` if not all expected requests have completed.
|
||||
@@ -208,6 +231,13 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
return None;
|
||||
};
|
||||
|
||||
// If payloads are expected, they must also be complete before we can produce responses.
|
||||
if let Some(payloads_req) = &self.payloads_request
|
||||
&& payloads_req.to_finished().is_none()
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
// Increment the attempt once this function returns the response or errors
|
||||
match &mut self.block_data_request {
|
||||
RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs(
|
||||
@@ -254,7 +284,20 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
}
|
||||
}
|
||||
|
||||
let resp = Self::responses_with_custody_columns(
|
||||
// Gloas path: if payloads are present, produce Gloas blocks
|
||||
let resp = if let Some(payloads_req) = &self.payloads_request {
|
||||
let payloads = payloads_req.to_finished().expect("checked above").to_vec();
|
||||
Self::responses_with_envelopes_and_columns(
|
||||
blocks.to_vec(),
|
||||
payloads,
|
||||
data_columns,
|
||||
column_to_peer_id,
|
||||
expected_custody_columns,
|
||||
*attempt,
|
||||
spec,
|
||||
)
|
||||
} else {
|
||||
Self::responses_with_custody_columns(
|
||||
blocks.to_vec(),
|
||||
data_columns,
|
||||
column_to_peer_id,
|
||||
@@ -262,7 +305,8 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
*attempt,
|
||||
da_checker,
|
||||
spec,
|
||||
);
|
||||
)
|
||||
};
|
||||
|
||||
if let Err(CouplingError::DataColumnPeerFailure {
|
||||
error: _,
|
||||
@@ -364,74 +408,190 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
where
|
||||
T: BeaconChainTypes<EthSpec = E>,
|
||||
{
|
||||
// Group data columns by block_root and index
|
||||
let mut data_columns_by_block =
|
||||
HashMap::<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>::new();
|
||||
let mut columns_by_root = Self::group_columns_by_root(data_columns);
|
||||
let mut range_sync_blocks = Vec::with_capacity(blocks.len());
|
||||
let exceeded_retries = attempt >= MAX_COLUMN_RETRIES;
|
||||
|
||||
for block in blocks {
|
||||
let block_root = get_block_root(&block);
|
||||
range_sync_blocks.push(if block.num_expected_blobs() > 0 {
|
||||
// Safe to convert to `CustodyDataColumn`: we have asserted that the index of
|
||||
// this column is in the set of `expects_custody_columns` and with the expected
|
||||
// block root, so for the expected epoch of this batch.
|
||||
let columns = Self::extract_custody_columns_for_root(
|
||||
block_root,
|
||||
&mut columns_by_root,
|
||||
expects_custody_columns,
|
||||
&column_to_peer,
|
||||
exceeded_retries,
|
||||
)?;
|
||||
let custody_columns = columns
|
||||
.into_iter()
|
||||
.map(CustodyDataColumn::from_asserted_custody)
|
||||
.collect::<Vec<_>>();
|
||||
let block_data = AvailableBlockData::new_with_data_columns(
|
||||
custody_columns
|
||||
.iter()
|
||||
.map(|c| c.as_data_column().clone())
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
RangeSyncBlock::new(block, block_data, &da_checker, spec.clone())
|
||||
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
|
||||
} else {
|
||||
RangeSyncBlock::new(block, AvailableBlockData::NoData, &da_checker, spec.clone())
|
||||
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
|
||||
});
|
||||
}
|
||||
|
||||
if !columns_by_root.is_empty() {
|
||||
let remaining_roots = columns_by_root.keys().collect::<Vec<_>>();
|
||||
debug!(?remaining_roots, "Not all columns consumed for block");
|
||||
}
|
||||
|
||||
Ok(range_sync_blocks)
|
||||
}
|
||||
|
||||
/// Couples blocks with payload envelopes and custody columns for Gloas range sync.
|
||||
fn responses_with_envelopes_and_columns(
|
||||
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
|
||||
payloads: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
|
||||
data_columns: DataColumnSidecarList<E>,
|
||||
column_to_peer: HashMap<u64, PeerId>,
|
||||
expects_custody_columns: &[ColumnIndex],
|
||||
attempt: usize,
|
||||
_spec: Arc<ChainSpec>,
|
||||
) -> Result<Vec<RangeSyncBlock<E>>, CouplingError> {
|
||||
let mut columns_by_root = Self::group_columns_by_root(data_columns);
|
||||
let mut range_sync_blocks = Vec::with_capacity(blocks.len());
|
||||
let mut payload_iter = payloads.into_iter().peekable();
|
||||
let exceeded_retries = attempt >= MAX_COLUMN_RETRIES;
|
||||
|
||||
for block in blocks {
|
||||
let mut envelope_for_block = None;
|
||||
if payload_iter
|
||||
.peek()
|
||||
.map(|e| e.message.slot() == block.slot())
|
||||
.unwrap_or(false)
|
||||
{
|
||||
envelope_for_block = payload_iter.next();
|
||||
}
|
||||
|
||||
let block_root = get_block_root(&block);
|
||||
|
||||
let available_envelope = if block.num_expected_blobs() > 0 {
|
||||
let envelope = envelope_for_block.ok_or_else(|| {
|
||||
CouplingError::InternalError(format!(
|
||||
"Missing payload envelope for block {block_root:?} with blobs"
|
||||
))
|
||||
})?;
|
||||
let custody_columns = Self::extract_custody_columns_for_root(
|
||||
block_root,
|
||||
&mut columns_by_root,
|
||||
expects_custody_columns,
|
||||
&column_to_peer,
|
||||
exceeded_retries,
|
||||
)?;
|
||||
Some(Box::new(AvailableEnvelope::new(
|
||||
envelope,
|
||||
custody_columns,
|
||||
None,
|
||||
)))
|
||||
} else {
|
||||
envelope_for_block
|
||||
.map(|envelope| Box::new(AvailableEnvelope::new(envelope, vec![], None)))
|
||||
};
|
||||
|
||||
range_sync_blocks.push(RangeSyncBlock::new_gloas(block, available_envelope));
|
||||
}
|
||||
|
||||
if payload_iter.next().is_some() {
|
||||
let remaining = payload_iter.count() + 1;
|
||||
debug!(
|
||||
remaining,
|
||||
"Received payload envelopes that don't pair with blocks"
|
||||
);
|
||||
}
|
||||
|
||||
if !columns_by_root.is_empty() {
|
||||
let remaining_roots = columns_by_root.keys().collect::<Vec<_>>();
|
||||
debug!(
|
||||
?remaining_roots,
|
||||
"Not all columns consumed for Gloas blocks"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(range_sync_blocks)
|
||||
}
|
||||
|
||||
/// Groups data columns by their block root, logging and skipping duplicates.
|
||||
fn group_columns_by_root(
|
||||
data_columns: DataColumnSidecarList<E>,
|
||||
) -> HashMap<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>> {
|
||||
let mut by_root =
|
||||
HashMap::<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>::new();
|
||||
for column in data_columns {
|
||||
let block_root = column.block_root();
|
||||
let index = *column.index();
|
||||
if data_columns_by_block
|
||||
if by_root
|
||||
.entry(block_root)
|
||||
.or_default()
|
||||
.insert(index, column)
|
||||
.is_some()
|
||||
{
|
||||
// `DataColumnsByRangeRequestItems` ensures that we do not request any duplicated indices across all peers
|
||||
// we request the data from.
|
||||
// If there are duplicated indices, its likely a peer sending us the same index multiple times.
|
||||
// However we can still proceed even if there are extra columns, just log an error.
|
||||
// `DataColumnsByRangeRequestItems` ensures no duplicated indices across peers.
|
||||
// Duplicates are likely a peer sending the same index multiple times; log and skip.
|
||||
debug!(?block_root, ?index, "Repeated column for block_root");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
by_root
|
||||
}
|
||||
|
||||
// Now iterate all blocks ensuring that the block roots of each block and data column match,
|
||||
// plus we have columns for our custody requirements
|
||||
let mut range_sync_blocks = Vec::with_capacity(blocks.len());
|
||||
|
||||
let exceeded_retries = attempt >= MAX_COLUMN_RETRIES;
|
||||
for block in blocks {
|
||||
let block_root = get_block_root(&block);
|
||||
range_sync_blocks.push(if block.num_expected_blobs() > 0 {
|
||||
let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root)
|
||||
else {
|
||||
/// Extracts and validates custody columns for a single block root.
|
||||
///
|
||||
/// Removes the matching entry from `columns_by_root`, checks all expected indices are
|
||||
/// present, and logs any extras. Returns the raw columns; callers wrap them as needed.
|
||||
fn extract_custody_columns_for_root(
|
||||
block_root: Hash256,
|
||||
columns_by_root: &mut HashMap<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>,
|
||||
expects_custody_columns: &[ColumnIndex],
|
||||
column_to_peer: &HashMap<u64, PeerId>,
|
||||
exceeded_retries: bool,
|
||||
) -> Result<Vec<Arc<DataColumnSidecar<E>>>, CouplingError> {
|
||||
let Some(mut by_index) = columns_by_root.remove(&block_root) else {
|
||||
let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect();
|
||||
return Err(CouplingError::DataColumnPeerFailure {
|
||||
error: format!("No columns for block {block_root:?} with data"),
|
||||
faulty_peers: responsible_peers,
|
||||
exceeded_retries,
|
||||
|
||||
});
|
||||
};
|
||||
|
||||
let mut custody_columns = vec![];
|
||||
let mut columns = vec![];
|
||||
let mut naughty_peers = vec![];
|
||||
for index in expects_custody_columns {
|
||||
// Safe to convert to `CustodyDataColumn`: we have asserted that the index of
|
||||
// this column is in the set of `expects_custody_columns` and with the expected
|
||||
// block root, so for the expected epoch of this batch.
|
||||
if let Some(data_column) = data_columns_by_index.remove(index) {
|
||||
custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column));
|
||||
if let Some(col) = by_index.remove(index) {
|
||||
columns.push(col);
|
||||
} else {
|
||||
let Some(responsible_peer) = column_to_peer.get(index) else {
|
||||
return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index)));
|
||||
return Err(CouplingError::InternalError(format!(
|
||||
"Internal error, no request made for column {index}"
|
||||
)));
|
||||
};
|
||||
naughty_peers.push((*index, *responsible_peer));
|
||||
}
|
||||
}
|
||||
if !naughty_peers.is_empty() {
|
||||
return Err(CouplingError::DataColumnPeerFailure {
|
||||
error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"),
|
||||
error: format!(
|
||||
"Peers did not return column for block_root {block_root:?} {naughty_peers:?}"
|
||||
),
|
||||
faulty_peers: naughty_peers,
|
||||
exceeded_retries
|
||||
exceeded_retries,
|
||||
});
|
||||
}
|
||||
|
||||
// Assert that there are no columns left
|
||||
if !data_columns_by_index.is_empty() {
|
||||
let remaining_indices = data_columns_by_index.keys().collect::<Vec<_>>();
|
||||
// log the error but don't return an error, we can still progress with extra columns.
|
||||
if !by_index.is_empty() {
|
||||
let remaining_indices = by_index.keys().collect::<Vec<_>>();
|
||||
debug!(
|
||||
?block_root,
|
||||
?remaining_indices,
|
||||
@@ -439,26 +599,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
);
|
||||
}
|
||||
|
||||
let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::<Vec<_>>());
|
||||
|
||||
RangeSyncBlock::new(block, block_data, &da_checker, spec.clone())
|
||||
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
|
||||
} else {
|
||||
// Block has no data, expects zero columns
|
||||
RangeSyncBlock::new(block, AvailableBlockData::NoData, &da_checker, spec.clone())
|
||||
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
|
||||
});
|
||||
}
|
||||
|
||||
// Assert that there are no columns left for other blocks
|
||||
if !data_columns_by_block.is_empty() {
|
||||
let remaining_roots = data_columns_by_block.keys().collect::<Vec<_>>();
|
||||
// log the error but don't return an error, we can still progress with responses.
|
||||
// this is most likely an internal error with overrequesting or a client bug.
|
||||
debug!(?remaining_roots, "Not all columns consumed for block");
|
||||
}
|
||||
|
||||
Ok(range_sync_blocks)
|
||||
Ok(columns)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -494,6 +635,8 @@ mod tests {
|
||||
NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns,
|
||||
test_da_checker, test_spec,
|
||||
};
|
||||
use bls::Signature;
|
||||
use lighthouse_network::service::api_types::PayloadEnvelopesByRangeRequestId;
|
||||
use lighthouse_network::{
|
||||
PeerId,
|
||||
service::api_types::{
|
||||
@@ -504,7 +647,11 @@ mod tests {
|
||||
use rand::SeedableRng;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tracing::Span;
|
||||
use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock, test_utils::XorShiftRng};
|
||||
use types::{
|
||||
Epoch, EthSpec, ExecutionBlockHash, ExecutionPayloadEnvelope, ExecutionPayloadGloas,
|
||||
ExecutionRequests, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock,
|
||||
SignedExecutionPayloadEnvelope, Slot, test_utils::XorShiftRng,
|
||||
};
|
||||
|
||||
fn components_id() -> ComponentsByRangeRequestId {
|
||||
ComponentsByRangeRequestId {
|
||||
@@ -566,7 +713,7 @@ mod tests {
|
||||
|
||||
let blocks_req_id = blocks_id(components_id());
|
||||
let mut info =
|
||||
RangeBlockComponentsRequest::<E>::new(blocks_req_id, None, None, Span::none());
|
||||
RangeBlockComponentsRequest::<E>::new(blocks_req_id, None, None, None, Span::none());
|
||||
|
||||
// Send blocks and complete terminate response
|
||||
info.add_blocks(blocks_req_id, blocks).unwrap();
|
||||
@@ -596,6 +743,7 @@ mod tests {
|
||||
blocks_req_id,
|
||||
Some(blobs_req_id),
|
||||
None,
|
||||
None,
|
||||
Span::none(),
|
||||
);
|
||||
|
||||
@@ -655,6 +803,7 @@ mod tests {
|
||||
blocks_req_id,
|
||||
None,
|
||||
Some((columns_req_id.clone(), expects_custody_columns.clone())),
|
||||
None,
|
||||
Span::none(),
|
||||
);
|
||||
// Send blocks and complete terminate response
|
||||
@@ -731,6 +880,7 @@ mod tests {
|
||||
blocks_req_id,
|
||||
None,
|
||||
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
|
||||
None,
|
||||
Span::none(),
|
||||
);
|
||||
|
||||
@@ -823,6 +973,7 @@ mod tests {
|
||||
blocks_req_id,
|
||||
None,
|
||||
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
|
||||
None,
|
||||
Span::none(),
|
||||
);
|
||||
|
||||
@@ -920,6 +1071,7 @@ mod tests {
|
||||
blocks_req_id,
|
||||
None,
|
||||
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
|
||||
None,
|
||||
Span::none(),
|
||||
);
|
||||
|
||||
@@ -1035,6 +1187,7 @@ mod tests {
|
||||
blocks_req_id,
|
||||
None,
|
||||
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
|
||||
None,
|
||||
Span::none(),
|
||||
);
|
||||
|
||||
@@ -1106,4 +1259,171 @@ mod tests {
|
||||
panic!("Expected PeerFailure error with exceeded_retries=true");
|
||||
}
|
||||
}
|
||||
|
||||
// --- Gloas tests ---
|
||||
|
||||
fn make_gloas_envelope<E: EthSpec>(
|
||||
slot: Slot,
|
||||
rng: &mut impl rand::Rng,
|
||||
) -> Arc<SignedExecutionPayloadEnvelope<E>> {
|
||||
let envelope = ExecutionPayloadEnvelope {
|
||||
payload: ExecutionPayloadGloas {
|
||||
slot_number: slot,
|
||||
block_hash: ExecutionBlockHash::from_root(Hash256::from(rng.random::<[u8; 32]>())),
|
||||
..ExecutionPayloadGloas::default()
|
||||
},
|
||||
execution_requests: ExecutionRequests::default(),
|
||||
builder_index: 0,
|
||||
beacon_block_root: Hash256::from(rng.random::<[u8; 32]>()),
|
||||
parent_beacon_block_root: Hash256::repeat_byte(0),
|
||||
};
|
||||
Arc::new(SignedExecutionPayloadEnvelope {
|
||||
message: envelope,
|
||||
signature: Signature::empty(),
|
||||
})
|
||||
}
|
||||
|
||||
fn envelope_id(
|
||||
parent_request_id: ComponentsByRangeRequestId,
|
||||
) -> PayloadEnvelopesByRangeRequestId {
|
||||
use lighthouse_network::service::api_types::PayloadEnvelopesByRangeRequestId;
|
||||
PayloadEnvelopesByRangeRequestId {
|
||||
id: 99,
|
||||
parent_request_id,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gloas_blocks_couple_with_envelopes() {
|
||||
let mut spec = test_spec::<E>();
|
||||
spec.deneb_fork_epoch = Some(Epoch::new(0));
|
||||
spec.fulu_fork_epoch = Some(Epoch::new(0));
|
||||
spec.gloas_fork_epoch = Some(Epoch::new(0));
|
||||
let spec = Arc::new(spec);
|
||||
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
|
||||
let mut rng = XorShiftRng::from_seed([42; 16]);
|
||||
|
||||
let blocks = (0..4)
|
||||
.map(|_| {
|
||||
let (raw_block, _) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::None,
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
Arc::new(raw_block) as Arc<SignedBeaconBlock<E>>
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Build envelopes with slots matching each block
|
||||
let envelopes: Vec<Arc<SignedExecutionPayloadEnvelope<E>>> = blocks
|
||||
.iter()
|
||||
.map(|b| make_gloas_envelope::<E>(b.slot(), &mut rng))
|
||||
.collect();
|
||||
|
||||
let components_id = components_id();
|
||||
let blocks_req_id = blocks_id(components_id);
|
||||
let env_req_id = envelope_id(components_id);
|
||||
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(
|
||||
blocks_req_id,
|
||||
None,
|
||||
None,
|
||||
Some(env_req_id),
|
||||
Span::none(),
|
||||
);
|
||||
|
||||
info.add_blocks(blocks_req_id, blocks).unwrap();
|
||||
// Not finished — envelopes still pending
|
||||
assert!(!is_finished(&mut info));
|
||||
|
||||
info.add_payload_envelopes(env_req_id, envelopes).unwrap();
|
||||
|
||||
let result = info.responses(da_checker, spec).unwrap();
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap().len(), 4);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gloas_blocks_without_envelopes_succeed() {
|
||||
// Blocks with no blobs don't require envelopes — they should couple fine with an empty envelope response.
|
||||
let mut spec = test_spec::<E>();
|
||||
spec.deneb_fork_epoch = Some(Epoch::new(0));
|
||||
spec.fulu_fork_epoch = Some(Epoch::new(0));
|
||||
spec.gloas_fork_epoch = Some(Epoch::new(0));
|
||||
let spec = Arc::new(spec);
|
||||
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
|
||||
let mut rng = XorShiftRng::from_seed([42; 16]);
|
||||
|
||||
let (raw_block, _) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::None,
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
let block: Arc<SignedBeaconBlock<E>> = Arc::new(raw_block);
|
||||
|
||||
let components_id = components_id();
|
||||
let blocks_req_id = blocks_id(components_id);
|
||||
let env_req_id = envelope_id(components_id);
|
||||
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(
|
||||
blocks_req_id,
|
||||
None,
|
||||
None,
|
||||
Some(env_req_id),
|
||||
Span::none(),
|
||||
);
|
||||
|
||||
info.add_blocks(blocks_req_id, vec![block]).unwrap();
|
||||
// No envelope for this block (peer didn't send one)
|
||||
info.add_payload_envelopes(env_req_id, vec![]).unwrap();
|
||||
|
||||
let result = info.responses(da_checker, spec).unwrap();
|
||||
assert!(result.is_ok(), "expected Ok, got: {:?}", result);
|
||||
assert_eq!(result.unwrap().len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gloas_extra_envelopes_are_ignored() {
|
||||
let mut spec = test_spec::<E>();
|
||||
spec.deneb_fork_epoch = Some(Epoch::new(0));
|
||||
spec.fulu_fork_epoch = Some(Epoch::new(0));
|
||||
spec.gloas_fork_epoch = Some(Epoch::new(0));
|
||||
let spec = Arc::new(spec);
|
||||
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
|
||||
let mut rng = XorShiftRng::from_seed([99; 16]);
|
||||
|
||||
let (raw_block, _) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::None,
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
let block: Arc<SignedBeaconBlock<E>> = Arc::new(raw_block);
|
||||
let slot = block.slot();
|
||||
|
||||
let components_id = components_id();
|
||||
let blocks_req_id = blocks_id(components_id);
|
||||
let env_req_id = envelope_id(components_id);
|
||||
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(
|
||||
blocks_req_id,
|
||||
None,
|
||||
None,
|
||||
Some(env_req_id),
|
||||
Span::none(),
|
||||
);
|
||||
|
||||
info.add_blocks(blocks_req_id, vec![block]).unwrap();
|
||||
// Two envelopes: one matching, one extra at a different slot
|
||||
let env1 = make_gloas_envelope::<E>(slot, &mut rng);
|
||||
let env2 = make_gloas_envelope::<E>(Slot::new(slot.as_u64() + 10), &mut rng);
|
||||
info.add_payload_envelopes(env_req_id, vec![env1, env2])
|
||||
.unwrap();
|
||||
|
||||
let result = info.responses(da_checker, spec).unwrap();
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap().len(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,7 +59,8 @@ use lighthouse_network::service::api_types::{
|
||||
BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
|
||||
CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester,
|
||||
DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId,
|
||||
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId,
|
||||
DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId,
|
||||
SyncRequestId,
|
||||
};
|
||||
use lighthouse_network::types::{NetworkGlobals, SyncState};
|
||||
use lighthouse_network::{PeerAction, PeerId};
|
||||
@@ -73,7 +74,8 @@ use strum::IntoStaticStr;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{debug, error, info, trace};
|
||||
use types::{
|
||||
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot,
|
||||
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock,
|
||||
SignedExecutionPayloadEnvelope, Slot,
|
||||
};
|
||||
|
||||
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
|
||||
@@ -512,6 +514,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
SyncRequestId::DataColumnsByRange(req_id) => {
|
||||
self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error))
|
||||
}
|
||||
SyncRequestId::SinglePayloadEnvelope { id } => {
|
||||
self.on_single_envelope_response(id, peer_id, RpcEvent::RPCError(error))
|
||||
}
|
||||
SyncRequestId::PayloadEnvelopesByRange(req_id) => self
|
||||
.on_payload_envelopes_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1335,6 +1342,36 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn on_single_envelope_response(
|
||||
&mut self,
|
||||
id: SingleLookupReqId,
|
||||
peer_id: PeerId,
|
||||
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
|
||||
) {
|
||||
// Placeholder: by-root envelope lookup not yet implemented for range sync.
|
||||
// This is called on error injection for disconnected peers. Log and ignore.
|
||||
let _ = (id, peer_id, rpc_event);
|
||||
debug!("on_single_envelope_response: not yet implemented");
|
||||
}
|
||||
|
||||
fn on_payload_envelopes_by_range_response(
|
||||
&mut self,
|
||||
id: PayloadEnvelopesByRangeRequestId,
|
||||
peer_id: PeerId,
|
||||
envelope: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
|
||||
) {
|
||||
if let Some(resp) = self
|
||||
.network
|
||||
.on_payload_envelopes_by_range_response(id, peer_id, envelope)
|
||||
{
|
||||
self.on_range_components_response(
|
||||
id.parent_request_id,
|
||||
peer_id,
|
||||
RangeBlockComponent::PayloadEnvelope(id, resp),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn on_custody_by_root_result(
|
||||
&mut self,
|
||||
requester: CustodyRequester,
|
||||
|
||||
@@ -22,14 +22,17 @@ use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock};
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState};
|
||||
use custody::CustodyRequestResult;
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest};
|
||||
use lighthouse_network::rpc::methods::{
|
||||
BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest,
|
||||
};
|
||||
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType};
|
||||
pub use lighthouse_network::service::api_types::RangeRequestId;
|
||||
use lighthouse_network::service::api_types::{
|
||||
AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
|
||||
CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester,
|
||||
DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId,
|
||||
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId,
|
||||
DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId,
|
||||
SyncRequestId,
|
||||
};
|
||||
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
|
||||
use parking_lot::RwLock;
|
||||
@@ -37,6 +40,7 @@ pub use requests::LookupVerifyError;
|
||||
use requests::{
|
||||
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
|
||||
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
|
||||
PayloadEnvelopesByRangeRequestItems,
|
||||
};
|
||||
#[cfg(test)]
|
||||
use slot_clock::SlotClock;
|
||||
@@ -52,7 +56,7 @@ use tracing::{Span, debug, debug_span, error, warn};
|
||||
use types::data::FixedBlobSidecarList;
|
||||
use types::{
|
||||
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
|
||||
ForkContext, Hash256, SignedBeaconBlock, Slot,
|
||||
ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
|
||||
};
|
||||
|
||||
pub mod custody;
|
||||
@@ -213,6 +217,11 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
|
||||
/// A mapping of active DataColumnsByRange requests
|
||||
data_columns_by_range_requests:
|
||||
ActiveRequests<DataColumnsByRangeRequestId, DataColumnsByRangeRequestItems<T::EthSpec>>,
|
||||
/// A mapping of active PayloadEnvelopesByRange requests
|
||||
payload_envelopes_by_range_requests: ActiveRequests<
|
||||
PayloadEnvelopesByRangeRequestId,
|
||||
PayloadEnvelopesByRangeRequestItems<T::EthSpec>,
|
||||
>,
|
||||
/// Mapping of active custody column requests for a block root
|
||||
custody_by_root_requests: FnvHashMap<CustodyRequester, ActiveCustodyRequest<T>>,
|
||||
|
||||
@@ -250,6 +259,10 @@ pub enum RangeBlockComponent<E: EthSpec> {
|
||||
DataColumnsByRangeRequestId,
|
||||
RpcResponseResult<Vec<Arc<DataColumnSidecar<E>>>>,
|
||||
),
|
||||
PayloadEnvelope(
|
||||
PayloadEnvelopesByRangeRequestId,
|
||||
RpcResponseResult<Vec<Arc<SignedExecutionPayloadEnvelope<E>>>>,
|
||||
),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -298,6 +311,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
blocks_by_range_requests: ActiveRequests::new("blocks_by_range"),
|
||||
blobs_by_range_requests: ActiveRequests::new("blobs_by_range"),
|
||||
data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"),
|
||||
payload_envelopes_by_range_requests: ActiveRequests::new("payload_envelopes_by_range"),
|
||||
custody_by_root_requests: <_>::default(),
|
||||
components_by_range_requests: FnvHashMap::default(),
|
||||
custody_backfill_data_column_batch_requests: FnvHashMap::default(),
|
||||
@@ -326,6 +340,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
blocks_by_range_requests,
|
||||
blobs_by_range_requests,
|
||||
data_columns_by_range_requests,
|
||||
payload_envelopes_by_range_requests,
|
||||
// custody_by_root_requests is a meta request of data_columns_by_root_requests
|
||||
custody_by_root_requests: _,
|
||||
// components_by_range_requests is a meta request of various _by_range requests
|
||||
@@ -361,12 +376,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.active_requests_of_peer(peer_id)
|
||||
.into_iter()
|
||||
.map(|req_id| SyncRequestId::DataColumnsByRange(*req_id));
|
||||
let payload_envelope_by_range_ids = payload_envelopes_by_range_requests
|
||||
.active_requests_of_peer(peer_id)
|
||||
.into_iter()
|
||||
.map(|req_id| SyncRequestId::PayloadEnvelopesByRange(*req_id));
|
||||
blocks_by_root_ids
|
||||
.chain(blobs_by_root_ids)
|
||||
.chain(data_column_by_root_ids)
|
||||
.chain(blocks_by_range_ids)
|
||||
.chain(blobs_by_range_ids)
|
||||
.chain(data_column_by_range_ids)
|
||||
.chain(payload_envelope_by_range_ids)
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -423,6 +443,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
blocks_by_range_requests,
|
||||
blobs_by_range_requests,
|
||||
data_columns_by_range_requests,
|
||||
payload_envelopes_by_range_requests,
|
||||
// custody_by_root_requests is a meta request of data_columns_by_root_requests
|
||||
custody_by_root_requests: _,
|
||||
// components_by_range_requests is a meta request of various _by_range requests
|
||||
@@ -445,6 +466,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.chain(blocks_by_range_requests.iter_request_peers())
|
||||
.chain(blobs_by_range_requests.iter_request_peers())
|
||||
.chain(data_columns_by_range_requests.iter_request_peers())
|
||||
.chain(payload_envelopes_by_range_requests.iter_request_peers())
|
||||
{
|
||||
*active_request_count_by_peer.entry(peer_id).or_default() += 1;
|
||||
}
|
||||
@@ -577,8 +599,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
};
|
||||
|
||||
// Attempt to find all required custody peers before sending any request or creating an ID
|
||||
let columns_by_range_peers_to_request =
|
||||
if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) {
|
||||
let columns_by_range_peers_to_request = if matches!(
|
||||
batch_type,
|
||||
ByRangeRequestType::BlocksAndColumns | ByRangeRequestType::BlocksAndEnvelopesAndColumns
|
||||
) {
|
||||
let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
|
||||
let column_indexes = self
|
||||
.chain
|
||||
@@ -659,6 +683,28 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.transpose()?;
|
||||
|
||||
let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
|
||||
|
||||
// Send envelope request for Gloas epochs
|
||||
let payloads_req_id =
|
||||
if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) {
|
||||
Some(self.send_payload_envelopes_by_range_request(
|
||||
block_peer,
|
||||
PayloadEnvelopesByRangeRequest {
|
||||
start_slot: *request.start_slot(),
|
||||
count: *request.count(),
|
||||
},
|
||||
id,
|
||||
new_range_request_span!(
|
||||
self,
|
||||
"outgoing_envelopes_by_range",
|
||||
range_request_span.clone(),
|
||||
block_peer
|
||||
),
|
||||
)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let info = RangeBlockComponentsRequest::new(
|
||||
blocks_req_id,
|
||||
blobs_req_id,
|
||||
@@ -668,6 +714,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
self.chain.sampling_columns_for_epoch(epoch).to_vec(),
|
||||
)
|
||||
}),
|
||||
payloads_req_id,
|
||||
range_request_span,
|
||||
);
|
||||
self.components_by_range_requests.insert(id, info);
|
||||
@@ -770,6 +817,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
})
|
||||
})
|
||||
}
|
||||
RangeBlockComponent::PayloadEnvelope(req_id, resp) => {
|
||||
resp.and_then(|(envelopes, _)| {
|
||||
request
|
||||
.add_payload_envelopes(req_id, envelopes)
|
||||
.map_err(|e| {
|
||||
RpcResponseError::BlockComponentCouplingError(
|
||||
CouplingError::InternalError(e),
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
} {
|
||||
entry.remove();
|
||||
@@ -1295,6 +1353,57 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
Ok((id, requested_columns))
|
||||
}
|
||||
|
||||
fn send_payload_envelopes_by_range_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request: PayloadEnvelopesByRangeRequest,
|
||||
parent_request_id: ComponentsByRangeRequestId,
|
||||
request_span: Span,
|
||||
) -> Result<PayloadEnvelopesByRangeRequestId, RpcRequestSendError> {
|
||||
let id = PayloadEnvelopesByRangeRequestId {
|
||||
id: self.next_id(),
|
||||
parent_request_id,
|
||||
};
|
||||
|
||||
self.send_network_msg(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: RequestType::PayloadEnvelopesByRange(request.clone()),
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::PayloadEnvelopesByRange(id)),
|
||||
})
|
||||
.map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
|
||||
|
||||
debug!(
|
||||
method = "PayloadEnvelopesByRange",
|
||||
slots = request.count,
|
||||
epoch = %Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()),
|
||||
peer = %peer_id,
|
||||
%id,
|
||||
"Sync RPC request sent"
|
||||
);
|
||||
|
||||
self.payload_envelopes_by_range_requests.insert(
|
||||
id,
|
||||
peer_id,
|
||||
false,
|
||||
PayloadEnvelopesByRangeRequestItems::new(request),
|
||||
request_span,
|
||||
);
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub(crate) fn on_payload_envelopes_by_range_response(
|
||||
&mut self,
|
||||
id: PayloadEnvelopesByRangeRequestId,
|
||||
peer_id: PeerId,
|
||||
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
|
||||
) -> Option<RpcResponseResult<Vec<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>>> {
|
||||
let resp = self
|
||||
.payload_envelopes_by_range_requests
|
||||
.on_response(id, rpc_event);
|
||||
self.on_rpc_response_result(resp, peer_id)
|
||||
}
|
||||
|
||||
pub fn is_execution_engine_online(&self) -> bool {
|
||||
self.execution_engine_state == EngineState::Online
|
||||
}
|
||||
@@ -1376,6 +1485,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
);
|
||||
|
||||
if self
|
||||
.chain
|
||||
.data_availability_checker
|
||||
.envelopes_required_for_epoch(epoch)
|
||||
{
|
||||
ByRangeRequestType::BlocksAndEnvelopesAndColumns
|
||||
} else if self
|
||||
.chain
|
||||
.data_availability_checker
|
||||
.data_columns_required_for_epoch(epoch)
|
||||
@@ -1795,6 +1910,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
"data_columns_by_range",
|
||||
self.data_columns_by_range_requests.len(),
|
||||
),
|
||||
(
|
||||
"payload_envelopes_by_range",
|
||||
self.payload_envelopes_by_range_requests.len(),
|
||||
),
|
||||
("custody_by_root", self.custody_by_root_requests.len()),
|
||||
(
|
||||
"components_by_range",
|
||||
|
||||
@@ -16,6 +16,7 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems;
|
||||
pub use data_columns_by_root::{
|
||||
DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest,
|
||||
};
|
||||
pub use payload_envelopes_by_range::PayloadEnvelopesByRangeRequestItems;
|
||||
|
||||
use crate::metrics;
|
||||
|
||||
@@ -27,6 +28,7 @@ mod blocks_by_range;
|
||||
mod blocks_by_root;
|
||||
mod data_columns_by_range;
|
||||
mod data_columns_by_root;
|
||||
mod payload_envelopes_by_range;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
|
||||
pub enum LookupVerifyError {
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
use super::{ActiveRequestItems, LookupVerifyError};
|
||||
use lighthouse_network::rpc::methods::PayloadEnvelopesByRangeRequest;
|
||||
use std::sync::Arc;
|
||||
use types::{EthSpec, SignedExecutionPayloadEnvelope};
|
||||
|
||||
/// Accumulates results of a payload_envelopes_by_range request. Only returns items after
|
||||
/// receiving the stream termination.
|
||||
pub struct PayloadEnvelopesByRangeRequestItems<E: EthSpec> {
|
||||
request: PayloadEnvelopesByRangeRequest,
|
||||
items: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> PayloadEnvelopesByRangeRequestItems<E> {
|
||||
pub fn new(request: PayloadEnvelopesByRangeRequest) -> Self {
|
||||
Self {
|
||||
request,
|
||||
items: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> ActiveRequestItems for PayloadEnvelopesByRangeRequestItems<E> {
|
||||
type Item = Arc<SignedExecutionPayloadEnvelope<E>>;
|
||||
|
||||
fn add(&mut self, envelope: Self::Item) -> Result<bool, LookupVerifyError> {
|
||||
let slot = envelope.slot();
|
||||
if slot < self.request.start_slot || slot >= self.request.start_slot + self.request.count {
|
||||
return Err(LookupVerifyError::UnrequestedSlot(slot));
|
||||
}
|
||||
if self.items.iter().any(|existing| existing.slot() == slot) {
|
||||
return Err(LookupVerifyError::DuplicatedData(slot, 0));
|
||||
}
|
||||
|
||||
self.items.push(envelope);
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn consume(&mut self) -> Vec<Self::Item> {
|
||||
std::mem::take(&mut self.items)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user