Compare commits

...

100 Commits

Author SHA1 Message Date
Eitan Seri-Levi
2ab1ffe10d fix 2026-04-28 22:34:45 +02:00
Eitan Seri-Levi
43abc3313e resolve merge conflicts 2026-04-28 22:20:41 +02:00
Eitan Seri-Levi
d5d415b8b8 Merge branch 'glamsterdam-devnet-0' of https://github.com/sigp/lighthouse into glamsterdam-devnet-0 2026-04-28 16:11:09 +02:00
Daniel Knopik
c457d2932f fix ci sorry 2026-04-28 15:54:47 +02:00
Eitan Seri-Levi
11dce65a99 Merge branch 'glamsterdam-devnet-0' of https://github.com/sigp/lighthouse into glamsterdam-devnet-0 2026-04-28 15:39:54 +02:00
Daniel Knopik
02e18d1e95 implement gloas data column verification 2026-04-28 10:44:56 +02:00
Eitan Seri-Levi
3d9c6628c8 Merge branch 'glamsterdam-devnet-0' of https://github.com/eserilev/lighthouse into glamsterdam-devnet-0 2026-04-28 10:39:49 +02:00
Eitan Seri-Levi
5c157185bf Merge branch 'unstable' of https://github.com/sigp/lighthouse into glamsterdam-devnet-0 2026-04-28 10:39:37 +02:00
Eitan Seri-Levi
f446ca2530 Merge remote-tracking branch 'origin/gloas-filter-conflicting-voluntairy-exits' into glamsterdam-devnet-0 2026-04-28 10:39:26 +02:00
Eitan Seri-Levi
6f5cbf1a0c Merge branch 'unstable' of https://github.com/sigp/lighthouse into gloas-filter-conflicting-voluntairy-exits 2026-04-28 09:07:41 +02:00
Eitan Seri-Levi
9ba4889f4c fix 2026-04-28 08:18:50 +02:00
Eitan Seri-Levi
d86fd5bb6f merge conflicts 2026-04-28 01:06:02 +02:00
Eitan Seri-Levi
e8ec40a233 merge conflicts 2026-04-28 00:44:24 +02:00
Eitan Seri-Levi
2f6f9b51ad Merge remote-tracking branch 'king/fix-gloas-kurtosis-genesis' into unstable 2026-04-28 00:12:58 +02:00
Eitan Seri-Levi
1681b8335f Merge remote-tracking branch 'jimmy/gloas-ptc-pool-block-packing' into unstable 2026-04-28 00:12:16 +02:00
Eitan Seri-Levi
36993c106e Merge conflicts 2026-04-28 00:11:22 +02:00
Eitan Seri-Levi
e179fc2870 Merge remote-tracking branch 'origin/gloas-default-to-available-envelope' into unstable 2026-04-28 00:10:41 +02:00
Eitan Seri-Levi
b0255efde6 Merge remote-tracking branch 'origin/gloas-parent-envelope-unknown-lookup' into unstable 2026-04-28 00:10:17 +02:00
Jimmy Chen
fb02b67555 Remove unnecessary check 2026-04-28 00:09:13 +02:00
Jimmy Chen
77d288cb48 Fix payload attestation service using stale slot after sleep 2026-04-27 23:44:03 +02:00
Josh King
afc2346d9e fix: gloas from genesis
- Fix forkchoice update sending zero-hash head to EL at genesis by reading
  latest_block_hash from state when the genesis bid hashes are all zeros
- Simplify genesis block construction — the genesis block body is empty per
  spec, remove the incorrect bid-copying logic and body root override in
  initialize_beacon_state_from_eth1
2026-04-27 23:26:29 +02:00
Eitan Seri-Levi
9debb1a30b merge conflicts 2026-04-27 22:47:19 +02:00
Eitan Seri-Levi
d2ec1cda51 merge conflicts 2026-04-27 17:16:11 +02:00
Jimmy Chen
b90787f8bc Merge branch 'unstable' into gloas-ptc-pool-block-packing 2026-04-27 17:14:00 +02:00
Jimmy Chen
a519120a37 Cap payload attestations to MaxPayloadAttestations 2026-04-27 17:09:35 +02:00
Eitan Seri-Levi
daab3408aa spawn a task for col construction 2026-04-27 17:08:16 +02:00
Eitan Seri-Levi
fadc055250 Fixes 2026-04-27 17:00:41 +02:00
Eitan Seri-Levi
5db022a442 Merge branch 'gloas-publish-data-columns-during-local-block-building' of https://github.com/eserilev/lighthouse into gloas-publish-data-columns-during-local-block-building 2026-04-27 14:53:03 +02:00
Eitan Seri-Levi
6b3b6ccf51 Clean up and self review 2026-04-27 14:52:52 +02:00
Eitan Seri-Levi
61b5fe8ec8 Merge branch 'unstable' into gloas-publish-data-columns-during-local-block-building 2026-04-27 14:33:28 +02:00
Eitan Seri-Levi
356c1fc659 Clean up 2026-04-27 14:27:20 +02:00
Eitan Seri-Levi
4699bbc069 fmt 2026-04-27 13:55:31 +02:00
Eitan Seri-Levi
4d99d1a3b5 filter voluntary exits that conflict w/ parent envelope execution requests 2026-04-27 13:55:19 +02:00
Jimmy Chen
9518eac182 Fix lint and incorrect condition 2026-04-27 13:40:03 +02:00
Eitan Seri-Levi
732123abbb linting 2026-04-27 13:14:48 +02:00
Jimmy Chen
0e9107f0be Clean ups 2026-04-27 13:13:45 +02:00
Jimmy Chen
1543db8b87 Add payload attestation pool and block packing
Store gossip-verified PayloadAttestationMessages in the operation pool
keyed by PayloadAttestationData. At block production time, aggregate
them into PayloadAttestations using PTC position mapping and include
in the block body. Wire pool insertion into the gossip handler after
fork choice.
2026-04-27 13:13:45 +02:00
Eitan Seri-Levi
4746dc5629 publish columns 2026-04-27 13:02:08 +02:00
Eitan Seri-Levi
7d5a59cd07 LOL 2026-04-27 10:45:15 +02:00
Eitan Seri-Levi
9bb72ab65f Merge branch 'gloas-ptc-gossip-verification' into glamsterdam-devnet-0 2026-04-27 10:40:15 +02:00
Eitan Seri-Levi
5c221e8a83 skip all 2026-04-27 10:34:31 +02:00
Eitan Seri-Levi
ea9664dc91 Drop read lock 2026-04-27 10:32:48 +02:00
Eitan Seri-Levi
f2f051db45 merge conflicts 2026-04-27 10:10:41 +02:00
Eitan Seri-Levi
812dc31b15 Fix 2026-04-27 10:10:06 +02:00
Eitan Seri-Levi
ffedc9d65b merge ptc branch chagnes 2026-04-27 09:58:01 +02:00
Eitan Seri-Levi
68bfb33430 Add tests and ssz suppport 2026-04-27 09:46:09 +02:00
Eitan Seri-Levi
d42784229a fmt 2026-04-27 09:09:17 +02:00
Eitan Seri-Levi
0a1bdf840b Add payload attestation validator duty 2026-04-27 08:49:45 +02:00
Eitan Seri-Levi
0fe60af63a Add payload attestation validator duty 2026-04-27 08:48:06 +02:00
hopinheimer
2f98ca6d55 fmt 2026-04-27 01:57:44 -04:00
hopinheimer
aca9765ae7 fix genesis_block init in tests 2026-04-27 01:57:22 -04:00
hopinheimer
8d3bda0115 fmt 2026-04-27 01:15:12 -04:00
hopinheimer
c261d8687a Merge branch 'unstable' of github.com:sigp/lighthouse into gloas-ptc-gossip-verification 2026-04-27 01:06:03 -04:00
hopinheimer
774b6dca92 fetching from hot_state in case of liveness fault 2026-04-27 01:04:15 -04:00
Eitan Seri-Levi
80f1b4b521 merge conflicts 2026-04-26 20:05:49 +02:00
hopinheimer
98fe0bbda5 merge with unstable 2026-04-25 17:57:30 -04:00
Eitan Seri-Levi
030146f560 Merge branch 'gloas-attestation-index' into glamsterdam-devnet-0 2026-04-25 23:29:49 +02:00
Eitan Seri-Levi
98212e8815 Fix builder exit signature batch verification logic and small refactor 2026-04-25 23:10:15 +02:00
Eitan Seri-Levi
4a2a7e2a4e Merge branch 'gloas-parent-envelope-unknown-lookup' into glamsterdam-devnet-0 2026-04-25 15:31:55 +02:00
Eitan Seri-Levi
eadf7f2d30 Merge branch 'unstable' of https://github.com/sigp/lighthouse into gloas-default-to-available-envelope 2026-04-25 15:31:09 +02:00
Eitan Seri-Levi
269e474f49 Resolve merge conflicts 2026-04-25 17:14:57 +09:00
Eitan Seri-Levi
3f8621fd52 Disable early attester cache test for non same slot attestaitons psot gloas 2026-04-25 16:48:56 +09:00
Eitan Seri-Levi
9ef3799c36 Add same slot attestation logic to early attester cache 2026-04-25 16:34:17 +09:00
Eitan Seri-Levi
a7fc388a9a Fix early attester cache 2026-04-25 16:15:54 +09:00
Eitan Seri-Levi
5beddd17ce Fix tests 2026-04-25 15:52:11 +09:00
Eitan Seri-Levi
2e561a2a9f Handle historic attestations correctly 2026-04-25 15:44:33 +09:00
Eitan Seri-Levi
dacb8aeffe Small fix 2026-04-25 15:30:34 +09:00
Eitan Seri-Levi
7e16aadde5 Lint 2026-04-25 14:32:49 +09:00
Eitan Seri-Levi
1229abf5cf Fix test 2026-04-25 14:22:24 +09:00
Eitan Seri-Levi
b36219d83d Add canonicity check and tests 2026-04-25 14:08:55 +09:00
Eitan Seri-Levi
2a9948d042 Merge branch 'unstable' into gloas-attestation-index 2026-04-25 13:27:32 +09:00
Eitan Seri-Levi
ca59cf453e Merge conflicts' 2026-04-23 02:34:34 +09:00
Eitan Seri-Levi
755b8d8510 resolve merge conlfict 2026-04-23 02:15:26 +09:00
Eitan Seri-Levi
f2ce147f79 Update 2026-04-23 01:52:47 +09:00
Eitan Seri-Levi
29f6dfa460 Default envelope to always avail 2026-04-23 01:47:47 +09:00
hopinheimer
8e9627ce11 consolidate IGNORE cases and tests 2026-04-20 23:49:09 -04:00
hopinheimer
96feda027d invert BeaconChain dependency 2026-04-19 13:33:22 -04:00
hopinheimer
6f53220c48 Merge branch 'unstable' into gloas-ptc-gossip-verification 2026-04-19 01:37:48 -04:00
hopinheimer
9c9ba192b4 fmt 2026-04-19 01:31:33 -04:00
hopinheimer
eef1bf6bb3 shifting payload_attestation_verification to separate module 2026-04-19 01:20:30 -04:00
hopinheimer
ec111259c1 adding PayloadAttestationMessage to RejectedPayloadAttestation 2026-04-18 01:13:00 -04:00
hopinheimer
e0b9802569 adding metrics 2026-04-17 19:44:04 -04:00
hopinheimer
4bbc74cf59 wiring up process_gossip_payload_attestation and implement observe cache 2026-04-17 19:43:49 -04:00
hopinheimer
036d9c995d adding payload verification handlers 2026-04-17 19:40:15 -04:00
Eitan Seri- Levi
2b1f043521 FMT 2026-04-06 02:15:02 -07:00
Eitan Seri- Levi
2c8df63f00 allow too many args 2026-04-06 02:01:50 -07:00
Eitan Seri- Levi
474d0cc36f Set gloas attestation data index to 0 or 1 depending on payload 2026-04-06 01:53:47 -07:00
Eitan Seri-Levi
20f0c7bf4b Merge branch 'unstable' into gloas-parent-envelope-unknown-lookup 2026-04-05 14:37:53 +09:00
Eitan Seri-Levi
34e5f89537 Apply suggestion from @eserilev 2026-04-03 17:42:41 +09:00
Eitan Seri-Levi
3112792435 Apply suggestion from @eserilev 2026-04-03 17:42:22 +09:00
Eitan Seri- Levi
e7dd95131d Resolve merge conflicts 2026-04-03 01:40:40 -07:00
Eitan Seri- Levi
b333841229 update 2026-04-03 01:04:34 -07:00
Eitan Seri- Levi
f897215684 refactor awaiting_parent field and some metrics 2026-04-03 01:02:57 -07:00
Eitan Seri- Levi
214e3ce9f0 Cleanup 2026-04-03 00:02:24 -07:00
Eitan Seri- Levi
1cd4d57204 Fixes 2026-04-02 19:37:51 -07:00
Eitan Seri- Levi
3523804515 cleanup 2026-04-02 19:30:12 -07:00
Eitan Seri- Levi
86ddd0d88d Add EnvelopeRequestState logic 2026-04-02 19:09:56 -07:00
Eitan Seri-Levi
93cfa0ffdb Merge branch 'unstable' into gloas-parent-envelope-unknown-lookup 2026-04-02 21:43:18 +09:00
Eitan Seri- Levi
1eefef610e Resolve merge conflicts 2026-03-31 11:04:23 -07:00
Eitan Seri- Levi
09e9a54314 When a block comes in whose parent is unkown, queue the block for processing and lookup the parent envelope 2026-03-26 23:40:35 -07:00
30 changed files with 1271 additions and 206 deletions

View File

@@ -2038,7 +2038,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
});
}
if request_slot >= head_state.slot() {
let is_attesting_to_head_slot = request_slot >= head_state.slot();
if is_attesting_to_head_slot {
// When attesting to the head slot or later, always use the head of the chain.
beacon_block_root = head.beacon_block_root;
beacon_state_root = head.beacon_state_root();

View File

@@ -60,6 +60,7 @@ use crate::execution_payload::{
};
use crate::kzg_utils::blobs_to_data_column_sidecars;
use crate::observed_block_producers::SeenBlock;
use crate::payload_envelope_verification::EnvelopeError;
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{
@@ -320,6 +321,18 @@ pub enum BlockError {
bid_parent_root: Hash256,
block_parent_root: Hash256,
},
/// The block is known but its parent execution payload envelope has not been received yet.
///
/// ## Peer scoring
///
/// It's unclear if this block is valid, but it cannot be fully verified without the parent's
/// execution payload envelope.
ParentEnvelopeUnknown { parent_root: Hash256 },
PayloadEnvelopeError {
e: Box<EnvelopeError>,
penalize_peer: bool,
},
}
/// Which specific signature(s) are invalid in a SignedBeaconBlock
@@ -486,6 +499,36 @@ impl From<ArithError> for BlockError {
}
}
impl From<EnvelopeError> for BlockError {
fn from(e: EnvelopeError) -> Self {
let penalize_peer = match &e {
// REJECT per spec: peer sent invalid envelope data
EnvelopeError::BadSignature
| EnvelopeError::BuilderIndexMismatch { .. }
| EnvelopeError::BlockHashMismatch { .. }
| EnvelopeError::SlotMismatch { .. }
| EnvelopeError::IncorrectBlockProposer { .. } => true,
// IGNORE per spec: not the peer's fault
EnvelopeError::BlockRootUnknown { .. }
| EnvelopeError::PriorToFinalization { .. }
| EnvelopeError::UnknownValidator { .. } => false,
// Internal errors: not the peer's fault
EnvelopeError::BeaconChainError(_)
| EnvelopeError::BeaconStateError(_)
| EnvelopeError::BlockProcessingError(_)
| EnvelopeError::EnvelopeProcessingError(_)
| EnvelopeError::ExecutionPayloadError(_)
| EnvelopeError::BlockError(_)
| EnvelopeError::InternalError(_)
| EnvelopeError::OptimisticSyncNotSupported { .. } => false,
};
BlockError::PayloadEnvelopeError {
e: Box::new(e),
penalize_peer,
}
}
}
/// Stores information about verifying a payload against an execution engine.
#[derive(Debug, PartialEq, Clone, Encode, Decode)]
pub struct PayloadVerificationOutcome {
@@ -897,12 +940,22 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
});
}
// TODO(gloas) The following validation can only be completed once fork choice has been implemented:
// The block's parent execution payload (defined by bid.parent_block_hash) has been seen
// (via gossip or non-gossip sources) (a client MAY queue blocks for processing
// once the parent payload is retrieved). If execution_payload verification of block's execution
// payload parent by an execution node is complete, verify the block's execution payload
// parent (defined by bid.parent_block_hash) passes all validation.
// Check that we've received the parent envelope. If not, issue a single envelope
// lookup for the parent and queue this block in the reprocess queue.
// Skip this check for the genesis block — it's a synthetic anchor with no envelope.
let parent_is_gloas = chain
.spec
.fork_name_at_slot::<T::EthSpec>(parent_block.slot)
.gloas_enabled();
if parent_is_gloas
&& parent_block.slot != chain.spec.genesis_slot
&& !fork_choice_read_lock.is_payload_received(&block.message().parent_root())
{
return Err(BlockError::ParentEnvelopeUnknown {
parent_root: block.message().parent_root(),
});
}
drop(fork_choice_read_lock);
@@ -1951,7 +2004,6 @@ fn load_parent<T: BeaconChainTypes, B: AsBlock<T::EthSpec>>(
// Retrieve any state that is advanced through to at most `block.slot()`: this is
// particularly important if `block` descends from the finalized/split block, but at a slot
// prior to the finalized slot (which is invalid and inaccessible in our DB schema).
//
let (parent_state_root, state) = chain
.store
.get_advanced_hot_state(root, block.slot(), parent_block.state_root())?

View File

@@ -1177,8 +1177,8 @@ fn make_genesis_block<E: EthSpec>(
genesis_state: &mut BeaconState<E>,
spec: &ChainSpec,
) -> Result<SignedBeaconBlock<E>, String> {
let mut block = genesis_block(genesis_state, spec)
.map_err(|e| format!("Error building genesis block: {:?}", e))?;
let mut block =
genesis_block(spec).map_err(|e| format!("Error building genesis block: {:?}", e))?;
*block.state_root_mut() = genesis_state
.update_tree_hash_cache()

View File

@@ -20,6 +20,7 @@ use std::iter;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use store::DatabaseBlock;
use tracing::{debug, instrument};
use tree_hash::TreeHash;
use types::data::{
@@ -28,7 +29,8 @@ use types::data::{
};
use types::{
BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSubnetId,
EthSpec, Hash256, PartialDataColumnSidecarRef, SignedBeaconBlockHeader, Slot,
EthSpec, Hash256, KzgCommitments, PartialDataColumnSidecarRef, SignedBeaconBlock,
SignedBeaconBlockHeader, Slot,
};
/// An error occurred while validating a gossip data column.
@@ -131,6 +133,25 @@ pub enum GossipDataColumnError {
parent_root: Hash256,
slot: Slot,
},
/// The block for this Gloas data column's `beacon_block_root` is not yet known.
/// The sidecar should be queued and retried when the block arrives.
///
/// ## Peer scoring
///
/// We cannot process the columns without validating its block, the peer isn't necessarily faulty.
BlockUnknown {
beacon_block_root: Hash256,
slot: Slot,
},
/// The sidecar's slot does not match the block's slot.
///
/// ## Peer scoring
///
/// The data column sidecar is invalid and the peer is faulty.
SlotMismatch {
sidecar_slot: Slot,
block_slot: Slot,
},
/// The column conflicts with finalization, no need to propagate.
///
/// ## Peer scoring
@@ -319,8 +340,11 @@ impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedDataColumn<T, O>
)
})
}
// TODO(gloas) support gloas data column variant
DataColumnSidecar::Gloas(_) => Err(GossipDataColumnError::InvalidVariant),
DataColumnSidecar::Gloas(_) => validate_data_column_sidecar_for_gossip_gloas::<T, O>(
column_sidecar,
subnet_id,
chain,
),
}
}
@@ -329,9 +353,21 @@ impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedDataColumn<T, O>
/// When publishing a block constructed externally, there will be no columns here.
pub fn new_for_block_publishing(
column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
block: &SignedBeaconBlock<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<Self, GossipDataColumnError> {
verify_data_column_sidecar(&column_sidecar, &chain.spec)?;
let commitments_len = match column_sidecar.as_ref() {
DataColumnSidecar::Fulu(dc) => dc.kzg_commitments.len(),
DataColumnSidecar::Gloas(_) => {
let bid = block
.message()
.body()
.signed_execution_payload_bid()
.map_err(|_| GossipDataColumnError::InvalidVariant)?;
bid.message.blob_kzg_commitments.len()
}
};
verify_data_column_sidecar(&column_sidecar, commitments_len, &chain.spec)?;
// Check if the data column is already in the DA checker cache. This happens when data columns
// are made available through the `engine_getBlobs` method. If it exists in the cache, we know
@@ -860,6 +896,28 @@ pub fn verify_kzg_for_data_column<E: EthSpec>(
})
}
/// Complete kzg verification for a `DataColumnSidecar` using externally provided commitments.
/// Used for Gloas where commitments come from the block's execution payload bid.
#[instrument(skip_all, level = "debug")]
pub fn verify_kzg_for_data_column_with_commitments<E: EthSpec>(
data_column: Arc<DataColumnSidecar<E>>,
cells_to_verify: PartialDataColumnSidecarRef<E>,
kzg_commitments: &KzgCommitments<E>,
kzg: &Kzg,
seen_timestamp: Duration,
) -> Result<KzgVerifiedDataColumn<E>, (Option<ColumnIndex>, KzgError)> {
let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_SINGLE_TIMES);
validate_partial_data_columns(
kzg,
iter::once((*data_column.index(), cells_to_verify)),
kzg_commitments,
)?;
Ok(KzgVerifiedDataColumn {
data: data_column,
seen_timestamp,
})
}
/// Complete kzg verification for a `VerifiablePartialDataColumn`.
///
/// Returns an error if the kzg verification check fails.
@@ -916,7 +974,11 @@ pub fn validate_data_column_sidecar_for_gossip_fulu<T: BeaconChainTypes, O: Obse
};
let column_slot = data_column.slot();
verify_data_column_sidecar(&data_column, &chain.spec)?;
verify_data_column_sidecar(
&data_column,
data_column_fulu.kzg_commitments.len(),
&chain.spec,
)?;
verify_index_matches_subnet(&data_column, subnet, &chain.spec)?;
verify_sidecar_not_from_future_slot(chain, column_slot)?;
verify_slot_greater_than_latest_finalized_slot(chain, column_slot)?;
@@ -984,6 +1046,109 @@ pub fn validate_data_column_sidecar_for_gossip_fulu<T: BeaconChainTypes, O: Obse
})
}
#[instrument(
skip_all,
name = "validate_data_column_sidecar_for_gossip_gloas",
level = "debug"
)]
pub fn validate_data_column_sidecar_for_gossip_gloas<
T: BeaconChainTypes,
O: ObservationStrategy,
>(
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
subnet: DataColumnSubnetId,
chain: &BeaconChain<T>,
) -> Result<GossipVerifiedDataColumn<T, O>, GossipDataColumnError> {
let DataColumnSidecar::Gloas(_) = data_column.as_ref() else {
return Err(GossipDataColumnError::InvalidVariant);
};
let column_slot = data_column.slot();
let block_root = data_column.block_root();
verify_index_matches_subnet(&data_column, subnet, &chain.spec)?;
let bid_commitments = get_gloas_block_commitments(chain, &block_root, column_slot)?;
verify_data_column_sidecar(&data_column, bid_commitments.len(), &chain.spec)?;
verify_is_unknown_sidecar(chain, &data_column)?;
// Check DA cache for already-processed columns.
let Some(cells_to_kzg_verify) = chain
.data_availability_checker
.missing_cells_for_column_sidecar(&data_column)
.map_err(|err| match err {
MissingCellsError::MismatchesCachedColumn => {
GossipDataColumnError::MismatchesCachedColumn
}
MissingCellsError::UnexpectedError(_) => todo!("handle unexpected error"),
})?
else {
if O::observe() {
observe_gossip_data_column(&data_column, chain)?;
}
return Err(GossipDataColumnError::PriorKnownUnpublished);
};
// [REJECT] KZG proof verification using commitments from the bid.
let kzg = &chain.kzg;
let seen_timestamp = chain.slot_clock.now_duration().unwrap_or_default();
let kzg_verified_data_column = verify_kzg_for_data_column_with_commitments(
data_column.clone(),
cells_to_kzg_verify,
&bid_commitments,
kzg,
seen_timestamp,
)
.map_err(|(_, e)| GossipDataColumnError::InvalidKzgProof(e))?;
if O::observe() {
observe_gossip_data_column(&data_column, chain)?;
}
Ok(GossipVerifiedDataColumn {
block_root,
data_column: kzg_verified_data_column,
_phantom: PhantomData,
})
}
/// Look up the block for a Gloas data column sidecar and extract the bid's `blob_kzg_commitments`.
/// Returns `BlockUnknown` if the block is not found, `SlotMismatch` if slots don't match.
fn get_gloas_block_commitments<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
block_root: &Hash256,
sidecar_slot: Slot,
) -> Result<KzgCommitments<T::EthSpec>, GossipDataColumnError> {
let block = chain
.store
.try_get_full_block(block_root)
.map_err(|e| GossipDataColumnError::BeaconChainError(Box::new(e.into())))?;
// TODO(gloas): maybe also check DA cache? fork choice?
let block = match block {
Some(DatabaseBlock::Full(block)) => block,
Some(DatabaseBlock::Blinded(_)) | None => {
return Err(GossipDataColumnError::BlockUnknown {
beacon_block_root: *block_root,
slot: sidecar_slot,
});
}
};
if sidecar_slot != block.slot() {
return Err(GossipDataColumnError::SlotMismatch {
sidecar_slot,
block_slot: block.slot(),
});
}
let bid = block
.message()
.body()
.signed_execution_payload_bid()
.map_err(|e| GossipDataColumnError::BeaconChainError(Box::new(e.into())))?;
Ok(bid.message.blob_kzg_commitments.clone())
}
#[instrument(skip_all, level = "debug")]
pub fn validate_partial_data_column_sidecar_for_gossip<T: BeaconChainTypes>(
mut column: Box<PartialDataColumn<T::EthSpec>>,
@@ -1116,8 +1281,13 @@ pub enum PartialColumnVerificationResult<E: EthSpec> {
}
/// Verify if the data column sidecar is valid.
///
/// `commitments_len` is the number of KZG commitments associated with this block.
/// For Fulu, this comes from the sidecar's `kzg_commitments` field.
/// For Gloas, this comes from `block.body.signed_execution_payload_bid.message.blob_kzg_commitments`.
fn verify_data_column_sidecar<E: EthSpec>(
data_column: &DataColumnSidecar<E>,
commitments_len: usize,
spec: &ChainSpec,
) -> Result<(), GossipDataColumnError> {
if *data_column.index() >= E::number_of_columns() as u64 {
@@ -1126,12 +1296,6 @@ fn verify_data_column_sidecar<E: EthSpec>(
));
}
// TODO(gloas): implement Gloas verification that takes kzg_commitments from block as parameter
let commitments_len = match data_column {
DataColumnSidecar::Fulu(dc) => dc.kzg_commitments.len(),
DataColumnSidecar::Gloas(_) => return Err(GossipDataColumnError::InvalidVariant),
};
if commitments_len == 0 {
return Err(GossipDataColumnError::UnexpectedDataColumn);
}
@@ -1426,7 +1590,7 @@ mod test {
use types::{
Cell, CellBitmap, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSubnetId, EthSpec,
ForkName, MainnetEthSpec, PartialDataColumn, PartialDataColumnHeader,
PartialDataColumnSidecar,
PartialDataColumnSidecar, SignedBeaconBlock,
};
type E = MainnetEthSpec;
@@ -1444,7 +1608,7 @@ mod test {
.build();
harness.advance_slot();
let verify_fn = |column_sidecar: DataColumnSidecar<E>| {
let verify_fn = |column_sidecar: DataColumnSidecar<E>, _| {
let col_index = *column_sidecar.index();
validate_data_column_sidecar_for_gossip_fulu::<_, Observe>(
column_sidecar.into(),
@@ -1469,9 +1633,10 @@ mod test {
.build();
harness.advance_slot();
let verify_fn = |column_sidecar: DataColumnSidecar<E>| {
let verify_fn = |column_sidecar: DataColumnSidecar<E>, block: Arc<SignedBeaconBlock<E>>| {
GossipVerifiedDataColumn::<_>::new_for_block_publishing(
column_sidecar.into(),
&block,
&harness.chain,
)
};
@@ -1482,7 +1647,10 @@ mod test {
// TODO(gloas) make this generic over gloas/fulu
async fn empty_data_column_sidecars_fails_validation_fulu<D>(
harness: &BeaconChainHarness<EphemeralHarnessType<E>>,
verify_fn: &impl Fn(DataColumnSidecar<E>) -> Result<D, GossipDataColumnError>,
verify_fn: impl Fn(
DataColumnSidecar<E>,
Arc<SignedBeaconBlock<E>>,
) -> Result<D, GossipDataColumnError>,
) {
let slot = harness.get_current_slot();
let state = harness.get_current_state();
@@ -1506,7 +1674,7 @@ mod test {
.unwrap(),
});
let result = verify_fn(column_sidecar);
let result = verify_fn(column_sidecar, block);
assert!(matches!(
result.err(),
Some(GossipDataColumnError::UnexpectedDataColumn)
@@ -1515,7 +1683,10 @@ mod test {
async fn data_column_sidecar_commitments_exceed_max_blobs_per_block<D>(
harness: &BeaconChainHarness<EphemeralHarnessType<E>>,
verify_fn: &impl Fn(DataColumnSidecar<E>) -> Result<D, GossipDataColumnError>,
verify_fn: &impl Fn(
DataColumnSidecar<E>,
Arc<SignedBeaconBlock<E>>,
) -> Result<D, GossipDataColumnError>,
) {
let slot = harness.get_current_slot();
let epoch = slot.epoch(E::slots_per_epoch());
@@ -1545,7 +1716,7 @@ mod test {
.next()
.unwrap();
let result = verify_fn(Arc::try_unwrap(column_sidecar).unwrap());
let result = verify_fn(Arc::try_unwrap(column_sidecar).unwrap(), block);
assert!(matches!(
result.err(),
Some(GossipDataColumnError::MaxBlobsPerBlockExceeded { .. })

View File

@@ -65,7 +65,7 @@ impl TestContext {
root: Hash256::ZERO,
};
let mut block = genesis_block(&state, &spec).expect("should build genesis block");
let mut block = genesis_block(&spec).expect("should build genesis block");
*block.state_root_mut() = state
.update_tree_hash_cache()
.expect("should hash genesis state");

View File

@@ -113,7 +113,7 @@ impl TestContext {
)
.expect("should register inactive builder");
let mut block = genesis_block(&state, &spec).expect("should build genesis block");
let mut block = genesis_block(&spec).expect("should build genesis block");
*block.state_root_mut() = state
.update_tree_hash_cache()
.expect("should hash genesis state");

View File

@@ -9,7 +9,7 @@ use crate::{
PayloadVerificationOutcome,
block_verification::PayloadVerificationHandle,
payload_envelope_verification::{
EnvelopeError, EnvelopeImportData, MaybeAvailableEnvelope,
AvailableEnvelope, EnvelopeError, EnvelopeImportData, MaybeAvailableEnvelope,
gossip_verified_envelope::GossipVerifiedEnvelope, load_snapshot_from_state_root,
payload_notifier::PayloadNotifier,
},
@@ -86,11 +86,17 @@ impl<T: BeaconChainTypes> GossipVerifiedEnvelope<T> {
&chain.spec,
)?;
// TODO(gloas) Since we dont have a DA cache we are always constructing an available envelope.
// once the da cache is implemented we should set this envelope to pending and let the da cache
// handle the availability logic.
Ok(ExecutionPendingEnvelope {
signed_envelope: MaybeAvailableEnvelope::AvailabilityPending {
block_hash: payload.block_hash,
envelope: signed_envelope,
},
signed_envelope: MaybeAvailableEnvelope::Available(AvailableEnvelope::new(
payload.block_hash,
signed_envelope,
vec![],
None,
chain.spec.clone(),
)),
import_data: EnvelopeImportData {
block_root,
_phantom: Default::default(),

View File

@@ -49,12 +49,12 @@ pub struct EnvelopeImportData<E: EthSpec> {
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct AvailableEnvelope<E: EthSpec> {
#[expect(dead_code)]
execution_block_hash: ExecutionBlockHash,
envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
columns: DataColumnSidecarList<E>,
/// Timestamp at which this envelope first became available (UNIX timestamp, time since 1970).
#[expect(dead_code)]
columns_available_timestamp: Option<std::time::Duration>,
pub spec: Arc<ChainSpec>,
}

View File

@@ -123,7 +123,6 @@ async fn gloas_envelope_blobs_produce_valid_columns() {
if !spec.is_gloas_scheduled() {
return;
}
let harness = get_harness(VALIDATOR_COUNT, spec.clone(), NodeCustodyType::Supernode);
harness.execution_block_generator().set_min_blob_count(1);

View File

@@ -417,6 +417,9 @@ pub enum Work<E: EthSpec> {
RpcBlobs {
process_fn: AsyncFn,
},
RpcPayloadEnvelope {
process_fn: AsyncFn,
},
RpcCustodyColumn(AsyncFn),
ColumnReconstruction(AsyncFn),
IgnoredRpcBlock {
@@ -483,6 +486,7 @@ pub enum WorkType {
GossipLightClientOptimisticUpdate,
RpcBlock,
RpcBlobs,
RpcPayloadEnvelope,
RpcCustodyColumn,
ColumnReconstruction,
IgnoredRpcBlock,
@@ -545,6 +549,7 @@ impl<E: EthSpec> Work<E> {
Work::GossipProposerPreferences(_) => WorkType::GossipProposerPreferences,
Work::RpcBlock { .. } => WorkType::RpcBlock,
Work::RpcBlobs { .. } => WorkType::RpcBlobs,
Work::RpcPayloadEnvelope { .. } => WorkType::RpcPayloadEnvelope,
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
@@ -1183,7 +1188,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::GossipLightClientOptimisticUpdate { .. } => work_queues
.lc_gossip_optimistic_update_queue
.push(work, work_id),
Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => {
Work::RpcBlock { .. }
| Work::IgnoredRpcBlock { .. }
| Work::RpcPayloadEnvelope { .. } => {
work_queues.rpc_block_queue.push(work, work_id)
}
Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id),
@@ -1318,7 +1325,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::GossipLightClientOptimisticUpdate => {
work_queues.lc_gossip_optimistic_update_queue.len()
}
WorkType::RpcBlock => work_queues.rpc_block_queue.len(),
WorkType::RpcBlock | WorkType::RpcPayloadEnvelope => {
work_queues.rpc_block_queue.len()
}
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => {
work_queues.rpc_blob_queue.len()
}
@@ -1513,6 +1522,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
beacon_block_root: _,
}
| Work::RpcBlobs { process_fn }
| Work::RpcPayloadEnvelope { process_fn }
| Work::RpcCustodyColumn(process_fn)
| Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),

View File

@@ -7,10 +7,13 @@ use crate::version::{
execution_optimistic_finalized_beacon_response,
};
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_chain::payload_envelope_verification::EnvelopeError;
use beacon_chain::payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope;
use beacon_chain::{BeaconChain, BeaconChainTypes, NotifyExecutionLayer};
use bytes::Bytes;
use eth2::types as api_types;
use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use futures::TryFutureExt;
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use ssz::{Decode, Encode};
@@ -18,7 +21,7 @@ use std::future::Future;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, error, info, warn};
use types::{EthSpec, SignedExecutionPayloadEnvelope};
use types::{BlockImportSource, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope};
use warp::{
Filter, Rejection, Reply,
hyper::{Body, Response},
@@ -88,9 +91,8 @@ pub(crate) fn post_beacon_execution_payload_envelope<T: BeaconChainTypes>(
)
.boxed()
}
/// Publishes a signed execution payload envelope to the network. Implements
/// `POST /eth/v1/beacon/execution_payload_envelope` per the in-flight beacon-APIs PR
/// <https://github.com/ethereum/beacon-APIs/pull/580>.
/// Publishes a signed execution payload envelope to the network.
pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
envelope: SignedExecutionPayloadEnvelope<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
@@ -98,76 +100,91 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
) -> Result<Response<Body>, Rejection> {
let slot = envelope.slot();
let beacon_block_root = envelope.message.beacon_block_root;
let builder_index = envelope.message.builder_index;
// TODO(gloas): Replace this check once we have gossip validation.
if !chain.spec.is_gloas_scheduled() {
return Err(warp_utils::reject::custom_bad_request(
"Execution payload envelopes are not supported before the Gloas fork".into(),
));
}
// TODO(gloas): We should probably add validation here i.e. BroadcastValidation::Gossip
info!(
%slot,
%beacon_block_root,
builder_index = envelope.message.builder_index,
"Publishing signed execution payload envelope to network"
);
let signed_envelope = Arc::new(envelope);
let blobs_and_proofs = chain.pending_payload_envelopes.write().take_blobs(slot);
// Spawn the column-build task (CPU-bound KZG cell-and-proof computation) before
// publishing the envelope so it runs in parallel with envelope gossip, narrowing
// the window in which peers see envelope-without-columns. If envelope publication
// fails below, dropping this future drops the spawned `JoinHandle` (the running
// closure on the blocking pool finishes and is then discarded — no work cancellation).
let column_build_future = match blobs_and_proofs {
Some(blobs) if !blobs.is_empty() => Some(spawn_build_gloas_data_columns_task(
&chain,
beacon_block_root,
slot,
blobs,
)?),
_ => None,
// The publish_fn is called inside process_execution_payload_envelope after consensus
// verification but before the EL call.
let envelope_for_publish = signed_envelope.clone();
let sender = network_tx.clone();
let publish_fn = move || {
info!(
%slot,
%beacon_block_root,
builder_index,
"Publishing signed execution payload envelope to network"
);
crate::utils::publish_pubsub_message(
&sender,
PubsubMessage::ExecutionPayload(Box::new((*envelope_for_publish).clone())),
)
.map_err(|_| {
warn!(%slot, "Failed to publish execution payload envelope to network");
EnvelopeError::InternalError(
"Unable to publish execution payload envelope to network".to_owned(),
)
})
};
// Publish the envelope to the network.
crate::utils::publish_pubsub_message(
network_tx,
PubsubMessage::ExecutionPayload(Box::new(envelope)),
)
.map_err(|_| {
warn!(%slot, "Failed to publish execution payload envelope to network");
warp_utils::reject::custom_server_error(
"Unable to publish execution payload envelope to network".into(),
)
})?;
let ctx = chain.payload_envelope_gossip_verification_context();
let gossip_verified_envelope = match GossipVerifiedEnvelope::new(signed_envelope, &ctx) {
Ok(envelope) => envelope,
Err(e) => {
warn!(%slot, %beacon_block_root, error = ?e, "Execution payload envelope rejected");
return Err(warp_utils::reject::custom_bad_request(format!(
"execution payload envelope rejected: {e:?}",
)));
}
};
let block = gossip_verified_envelope.block.clone();
// From here on the envelope is on the wire. `take_blobs` already consumed the cache
// entry, so a retry would not republish columns; returning Err would mislead the
// caller. Log column-build/publish failures and fall through to `Ok`.
if let Some(column_build_future) = column_build_future {
let gossip_verified_columns = match column_build_future.await {
Ok(columns) => columns,
Err(e) => {
error!(
%slot,
error = ?e,
"Failed to build data columns after envelope publication"
);
return Ok(warp::reply().into_response());
}
};
// Import the envelope locally (runs state transition and notifies the EL).
chain
.process_execution_payload_envelope(
beacon_block_root,
gossip_verified_envelope,
NotifyExecutionLayer::Yes,
BlockImportSource::HttpApi,
publish_fn,
)
.await
.map_err(|e| {
warn!(%slot, %beacon_block_root, reason = ?e, "Execution payload envelope rejected");
warp_utils::reject::custom_bad_request(format!(
"execution payload envelope rejected: {e:?}"
))
})?;
// Build and publish data column sidecars from the blobs.
if let Some(blobs) = blobs_and_proofs
&& !blobs.is_empty()
{
let gossip_verified_columns = spawn_build_gloas_data_columns_task(
chain.clone(),
beacon_block_root,
block,
slot,
blobs,
)?
.await?;
if !gossip_verified_columns.is_empty() {
if let Err(e) = publish_column_sidecars(network_tx, &gossip_verified_columns, &chain) {
error!(
%slot,
error = ?e,
"Failed to publish data column sidecars after envelope publication"
);
return Ok(warp::reply().into_response());
}
publish_column_sidecars(network_tx, &gossip_verified_columns, &chain).map_err(
|_| {
warp_utils::reject::custom_server_error(
"unable to publish data column sidecars".into(),
)
},
)?;
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let sampling_column_indices = chain.sampling_columns_for_epoch(epoch);
@@ -176,7 +193,6 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
.filter(|col| sampling_column_indices.contains(&col.index()))
.collect::<Vec<_>>();
// Local processing only — envelope already broadcast, so log and fall through.
if !sampling_columns.is_empty()
&& let Err(e) =
Box::pin(chain.process_gossip_data_columns(sampling_columns, || Ok(()))).await
@@ -194,30 +210,30 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
}
fn spawn_build_gloas_data_columns_task<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
chain: Arc<BeaconChain<T>>,
beacon_block_root: types::Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
slot: types::Slot,
blobs: types::BlobsList<T::EthSpec>,
) -> Result<impl Future<Output = Result<Vec<GossipVerifiedDataColumn<T>>, Rejection>>, Rejection> {
let chain_for_build = chain.clone();
let handle = chain
chain
.clone()
.task_executor
.spawn_blocking_handle(
move || build_gloas_data_columns(&chain_for_build, beacon_block_root, slot, &blobs),
move || build_gloas_data_columns(&chain, beacon_block_root, block, slot, &blobs),
"build_gloas_data_columns",
)
.ok_or_else(|| warp_utils::reject::custom_server_error("runtime shutdown".to_string()))?;
Ok(async move {
handle
.await
.map_err(|_| warp_utils::reject::custom_server_error("join error".to_string()))?
})
.ok_or_else(|| warp_utils::reject::custom_server_error("runtime shutdown".to_string()))
.map(|r| {
r.map_err(|_| warp_utils::reject::custom_server_error("join error".to_string()))
.and_then(|output| async move { output })
})
}
fn build_gloas_data_columns<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
beacon_block_root: types::Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
slot: types::Slot,
blobs: &types::BlobsList<T::EthSpec>,
) -> Result<Vec<GossipVerifiedDataColumn<T>>, Rejection> {
@@ -242,7 +258,7 @@ fn build_gloas_data_columns<T: BeaconChainTypes>(
.into_iter()
.filter_map(|col| {
let index = *col.index();
match GossipVerifiedDataColumn::new_for_block_publishing(col, chain) {
match GossipVerifiedDataColumn::new_for_block_publishing(col, &block, chain) {
Ok(verified) => Some(verified),
Err(GossipDataColumnError::PriorKnownUnpublished) => None,
Err(e) => {

View File

@@ -417,7 +417,8 @@ fn build_data_columns<T: BeaconChainTypes>(
let gossip_verified_data_columns = data_column_sidecars
.into_iter()
.filter_map(|data_column_sidecar| {
GossipVerifiedDataColumn::new_for_block_publishing(data_column_sidecar, chain).ok()
GossipVerifiedDataColumn::new_for_block_publishing(data_column_sidecar, block, chain)
.ok()
})
.collect::<Vec<_>>();

View File

@@ -31,6 +31,8 @@ pub enum SyncRequestId {
BlobsByRange(BlobsByRangeRequestId),
/// Data columns by range request
DataColumnsByRange(DataColumnsByRangeRequestId),
/// Request searching for an execution payload envelope given a block root.
SinglePayloadEnvelope { id: SingleLookupReqId },
}
/// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly.

View File

@@ -727,7 +727,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
%parent_root,
"Unknown parent hash for column"
);
self.send_sync_message(SyncMessage::UnknownParentDataColumn(
self.send_sync_message(SyncMessage::UnknownDataColumnParentOrBlock(
peer_id,
column_sidecar,
));
@@ -739,6 +739,20 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"Internal error when verifying column sidecar"
)
}
GossipDataColumnError::BlockUnknown {
beacon_block_root, ..
} => {
debug!(
action = "queuing for block",
%block_root,
%beacon_block_root,
"Block not yet known for Gloas data column sidecar"
);
self.send_sync_message(SyncMessage::UnknownDataColumnParentOrBlock(
peer_id,
column_sidecar,
));
}
GossipDataColumnError::ProposalSignatureInvalid
| GossipDataColumnError::UnknownValidator(_)
| GossipDataColumnError::ProposerIndexMismatch { .. }
@@ -752,7 +766,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
| GossipDataColumnError::MaxBlobsPerBlockExceeded { .. }
| GossipDataColumnError::InconsistentCommitmentsLength { .. }
| GossipDataColumnError::InconsistentProofsLength { .. }
| GossipDataColumnError::NotFinalizedDescendant { .. } => {
| GossipDataColumnError::NotFinalizedDescendant { .. }
| GossipDataColumnError::SlotMismatch { .. } => {
debug!(
error = ?err,
%slot,
@@ -933,6 +948,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
slot,
});
}
GossipDataColumnError::BlockUnknown { .. } => {
debug!(
%block_root,
%index,
"Block not yet known for Gloas partial data column"
);
// TODO(gloas): send sync message
}
GossipDataColumnError::PubkeyCacheTimeout
| GossipDataColumnError::BeaconChainError(_) => {
crit!(
@@ -953,7 +976,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
| GossipDataColumnError::MaxBlobsPerBlockExceeded { .. }
| GossipDataColumnError::InconsistentCommitmentsLength { .. }
| GossipDataColumnError::InconsistentProofsLength { .. }
| GossipDataColumnError::NotFinalizedDescendant { .. } => {
| GossipDataColumnError::NotFinalizedDescendant { .. }
| GossipDataColumnError::SlotMismatch { .. } => {
debug!(
error = ?err,
%block_root,
@@ -1738,6 +1762,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root));
return None;
}
Err(BlockError::ParentEnvelopeUnknown { parent_root }) => {
debug!(
?block_root,
?parent_root,
"Parent envelope not yet available for gossip block"
);
self.send_sync_message(SyncMessage::UnknownParentEnvelope(
peer_id, block, block_root,
));
return None;
}
Err(e @ BlockError::BeaconChainError(_)) => {
debug!(
error = ?e,
@@ -1827,7 +1862,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
return None;
}
// BlobNotRequired is unreachable. Only constructed in `process_gossip_blob`
Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => {
Err(e @ BlockError::InternalError(_))
| Err(e @ BlockError::BlobNotRequired(_))
| Err(e @ BlockError::PayloadEnvelopeError { .. }) => {
error!(error = %e, "Internal block gossip validation error");
return None;
}
@@ -2024,6 +2061,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"Block with unknown parent attempted to be processed"
);
}
Err(BlockError::ParentEnvelopeUnknown { parent_root }) => {
debug!(
%block_root,
?parent_root,
"Parent envelope not yet available, need envelope lookup"
);
// Unlike ParentUnknown, this can legitimately happen during processing
// because the parent envelope may not have arrived yet. The lookup
// system will handle retrying via Action::ParentEnvelopeUnknown.
}
Err(e @ BlockError::ExecutionPayloadError(epe)) if !epe.penalize_peer() => {
debug!(
error = %e,

View File

@@ -567,6 +567,22 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}
/// Create a new `Work` event for an RPC payload envelope.
pub fn send_rpc_payload_envelope(
self: &Arc<Self>,
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> Result<(), Error<T::EthSpec>> {
let process_fn =
self.clone()
.generate_rpc_envelope_process_fn(envelope, seen_timestamp, process_type);
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::RpcPayloadEnvelope { process_fn },
})
}
/// Create a new `Work` event for some blobs, where the result from computation (if any) is
/// sent to the other side of `result_tx`.
pub fn send_rpc_blobs(

View File

@@ -4,7 +4,7 @@ use crate::sync::BatchProcessResult;
use crate::sync::manager::CustodyBatchProcessResult;
use crate::sync::{
ChainId,
manager::{BlockProcessType, SyncMessage},
manager::{BlockProcessType, BlockProcessingResult, SyncMessage},
};
use beacon_chain::block_verification_types::LookupBlock;
use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock};
@@ -28,7 +28,9 @@ use store::KzgCommitment;
use tracing::{debug, debug_span, error, info, instrument, warn};
use types::data::FixedBlobSidecarList;
use types::kzg_ext::format_kzg_commitments;
use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256};
use types::{
BlockImportSource, DataColumnSidecarList, Epoch, Hash256, SignedExecutionPayloadEnvelope,
};
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
@@ -73,6 +75,77 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Box::pin(process_fn)
}
/// Returns an async closure which processes a payload envelope received via RPC.
pub fn generate_rpc_envelope_process_fn(
self: Arc<Self>,
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> AsyncFn {
let process_fn = async move {
self.process_rpc_envelope(envelope, seen_timestamp, process_type)
.await;
};
Box::pin(process_fn)
}
/// Process an execution payload envelope received via RPC.
async fn process_rpc_envelope(
self: Arc<Self>,
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
_seen_timestamp: Duration,
process_type: BlockProcessType,
) {
let beacon_block_root = envelope.beacon_block_root();
// Verify the envelope using the gossip verification path (same checks apply to RPC)
let verified_envelope = match self.chain.verify_envelope_for_gossip(envelope).await {
Ok(verified) => verified,
Err(e) => {
debug!(
error = ?e,
?beacon_block_root,
"RPC payload envelope failed verification"
);
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: BlockProcessingResult::Err(e.into()),
});
return;
}
};
// Process the verified envelope
let result = self
.chain
.process_execution_payload_envelope(
beacon_block_root,
verified_envelope,
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
#[allow(clippy::result_large_err)]
|| Ok(()),
)
.await;
let processing_result = match result {
Ok(status) => BlockProcessingResult::Ok(status),
Err(e) => {
debug!(
error = ?e,
?beacon_block_root,
"RPC payload envelope processing failed"
);
BlockProcessingResult::Err(e.into())
}
};
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: processing_result,
});
}
/// Returns the `process_fn` and `ignore_fn` required when requeuing an RPC block.
pub fn generate_lookup_beacon_block_fns(
self: Arc<Self>,

View File

@@ -19,13 +19,14 @@ use lighthouse_network::{
};
use logging::TimeLatch;
use logging::crit;
use slot_clock::SlotClock;
use slot_clock::{SlotClock, timestamp_now};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, trace, warn};
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, PartialDataColumn, SignedBeaconBlock,
SignedExecutionPayloadEnvelope,
};
/// Handles messages from the network and routes them to the appropriate service to be handled.
@@ -341,10 +342,13 @@ impl<T: BeaconChainTypes> Router<T> {
Response::DataColumnsByRange(data_column) => {
self.on_data_columns_by_range_response(peer_id, app_request_id, data_column);
}
// TODO(EIP-7732): implement outgoing payload envelopes by range and root
// responses once sync manager requests them.
Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => {
debug!("Requesting envelopes by root and by range not supported yet");
Response::PayloadEnvelopesByRoot(envelope) => {
self.on_payload_envelopes_by_root_response(peer_id, app_request_id, envelope);
}
// TODO(EIP-7732): implement outgoing payload envelopes by range responses once
// range sync requests them.
Response::PayloadEnvelopesByRange(_) => {
error!(%peer_id, "Unexpected PayloadEnvelopesByRange response");
}
// Light client responses should not be received
Response::LightClientBootstrap(_)
@@ -718,6 +722,40 @@ impl<T: BeaconChainTypes> Router<T> {
});
}
/// Handle a `PayloadEnvelopesByRoot` response from the peer.
pub fn on_payload_envelopes_by_root_response(
&mut self,
peer_id: PeerId,
app_request_id: AppRequestId,
envelope: Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) {
let sync_request_id = match app_request_id {
AppRequestId::Sync(sync_id) => match sync_id {
id @ SyncRequestId::SinglePayloadEnvelope { .. } => id,
other => {
crit!(request = ?other, "PayloadEnvelopesByRoot response on incorrect request");
return;
}
},
AppRequestId::Router => {
crit!(%peer_id, "All PayloadEnvelopesByRoot requests belong to sync");
return;
}
AppRequestId::Internal => unreachable!("Handled internally"),
};
trace!(
%peer_id,
"Received PayloadEnvelopesByRoot Response"
);
self.send_to_sync(SyncMessage::RpcPayloadEnvelope {
peer_id,
sync_request_id,
envelope,
seen_timestamp: timestamp_now(),
});
}
/// Handle a `BlobsByRoot` response from the peer.
pub fn on_blobs_by_root_response(
&mut self,

View File

@@ -2,7 +2,7 @@ use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{
BlobRequestState, BlockRequestState, CustodyRequestState, PeerId,
BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState, PeerId,
};
use crate::sync::manager::BlockProcessType;
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
@@ -12,16 +12,17 @@ use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::Arc;
use types::data::FixedBlobSidecarList;
use types::{DataColumnSidecarList, SignedBeaconBlock};
use types::{DataColumnSidecarList, SignedBeaconBlock, SignedExecutionPayloadEnvelope};
use super::SingleLookupId;
use super::single_block_lookup::{ComponentRequests, DownloadResult};
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ResponseType {
Block,
Blob,
CustodyColumn,
Envelope,
}
/// This trait unifies common single block lookup functionality across blocks and blobs. This
@@ -151,6 +152,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
ComponentRequests::WaitingForBlock => Err("waiting for block"),
ComponentRequests::ActiveBlobRequest(request, _) => Ok(request),
ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"),
ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"),
ComponentRequests::NotNeeded { .. } => Err("not needed"),
}
}
@@ -205,6 +207,7 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
ComponentRequests::WaitingForBlock => Err("waiting for block"),
ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"),
ComponentRequests::ActiveCustodyRequest(request) => Ok(request),
ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"),
ComponentRequests::NotNeeded { .. } => Err("not needed"),
}
}
@@ -215,3 +218,52 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
&mut self.state
}
}
impl<T: BeaconChainTypes> RequestState<T> for EnvelopeRequestState<T::EthSpec> {
type VerifiedResponseType = Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>;
fn make_request(
&self,
id: Id,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
_: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.envelope_lookup_request(id, lookup_peers, self.block_root)
.map_err(LookupRequestError::SendFailedNetwork)
}
fn send_for_processing(
id: Id,
download_result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
let DownloadResult {
value,
block_root,
seen_timestamp,
..
} = download_result;
cx.send_envelope_for_processing(id, value, seen_timestamp, block_root)
.map_err(LookupRequestError::SendFailedProcessor)
}
fn response_type() -> ResponseType {
ResponseType::Envelope
}
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
match &mut request.component_requests {
ComponentRequests::ActiveEnvelopeRequest(request) => Ok(request),
_ => Err("expecting envelope request"),
}
}
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
}
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType> {
&mut self.state
}
}

View File

@@ -22,7 +22,9 @@
use self::parent_chain::{NodeChain, compute_parent_chains};
pub use self::single_block_lookup::DownloadResult;
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
use self::single_block_lookup::{
AwaitingParent, LookupRequestError, LookupResult, SingleBlockLookup,
};
use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE};
use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext};
use crate::metrics;
@@ -39,7 +41,9 @@ use fnv::FnvHashMap;
use lighthouse_network::service::api_types::SingleLookupReqId;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState};
pub use single_block_lookup::{
BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState,
};
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;
@@ -109,6 +113,7 @@ pub type SingleLookupId = u32;
enum Action {
Retry,
ParentUnknown { parent_root: Hash256 },
ParentEnvelopeUnknown { parent_root: Hash256 },
Drop(/* reason: */ String),
Continue,
}
@@ -213,7 +218,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.new_current_lookup(
block_root,
Some(block_component),
Some(parent_root),
Some(AwaitingParent::Block(parent_root)),
// On a `UnknownParentBlock` or `UnknownParentBlob` event the peer is not required
// to have the rest of the block components (refer to decoupled blob gossip). Create
// the lookup with zero peers to house the block components.
@@ -225,17 +230,48 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
/// Seach a block whose parent root is unknown.
/// A child block's parent envelope is missing. Create a child lookup (with the block component)
/// that waits for the parent envelope, and an envelope-only lookup for the parent.
///
/// Returns true if both lookups are created or already exist.
#[must_use = "only reference the new lookup if returns true"]
pub fn search_child_and_parent_envelope(
&mut self,
block_root: Hash256,
block_component: BlockComponent<T::EthSpec>,
parent_root: Hash256,
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
) -> bool {
let envelope_lookup_exists =
self.search_parent_envelope_of_child(parent_root, &[peer_id], cx);
if envelope_lookup_exists {
// Create child lookup that waits for the parent envelope.
// The child block itself has already been seen, so we pass it as a component.
self.new_current_lookup(
block_root,
Some(block_component),
Some(AwaitingParent::Envelope(parent_root)),
&[],
cx,
)
} else {
false
}
}
/// Search a block whose parent root is unknown.
///
/// Returns true if the lookup is created or already exists
#[must_use = "only reference the new lookup if returns true"]
pub fn search_unknown_block(
&mut self,
block_root: Hash256,
block_component: Option<BlockComponent<T::EthSpec>>,
peer_source: &[PeerId],
cx: &mut SyncNetworkContext<T>,
) -> bool {
self.new_current_lookup(block_root, None, None, peer_source, cx)
self.new_current_lookup(block_root, block_component, None, peer_source, cx)
}
/// A block or blob triggers the search of a parent.
@@ -343,6 +379,57 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.new_current_lookup(block_root_to_search, None, None, peers, cx)
}
/// A block triggers the search of a parent envelope.
#[must_use = "only reference the new lookup if returns true"]
pub fn search_parent_envelope_of_child(
&mut self,
parent_root: Hash256,
peers: &[PeerId],
cx: &mut SyncNetworkContext<T>,
) -> bool {
// Check if there's already a lookup for this root (could be a block lookup or envelope
// lookup). If so, add peers and let it handle the envelope.
if let Some((&lookup_id, _lookup)) = self
.single_block_lookups
.iter_mut()
.find(|(_, lookup)| lookup.is_for_block(parent_root))
{
if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) {
warn!(error = ?e, "Error adding peers to envelope lookup");
}
return true;
}
if self.single_block_lookups.len() >= MAX_LOOKUPS {
warn!(?parent_root, "Dropping envelope lookup reached max");
return false;
}
let lookup = SingleBlockLookup::new_envelope_only(parent_root, peers, cx.next_id());
let _guard = lookup.span.clone().entered();
let id = lookup.id;
let lookup = match self.single_block_lookups.entry(id) {
Entry::Vacant(entry) => entry.insert(lookup),
Entry::Occupied(_) => {
warn!(id, "Lookup exists with same id");
return false;
}
};
debug!(
?peers,
?parent_root,
id = lookup.id,
"Created envelope-only lookup"
);
metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED);
self.metrics.created_lookups += 1;
let result = lookup.continue_requests(cx);
self.on_lookup_result(id, result, "new_envelope_lookup", cx)
}
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
/// constructed.
/// Returns true if the lookup is created or already exists
@@ -351,7 +438,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
block_root: Hash256,
block_component: Option<BlockComponent<T::EthSpec>>,
awaiting_parent: Option<Hash256>,
awaiting_parent: Option<AwaitingParent>,
peers: &[PeerId],
cx: &mut SyncNetworkContext<T>,
) -> bool {
@@ -386,13 +473,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
// Ensure that awaiting parent exists, otherwise this lookup won't be able to make progress
if let Some(awaiting_parent) = awaiting_parent
if let Some(AwaitingParent::Block(parent_root) | AwaitingParent::Envelope(parent_root)) =
awaiting_parent
&& !self
.single_block_lookups
.iter()
.any(|(_, lookup)| lookup.is_for_block(awaiting_parent))
.any(|(_, lookup)| lookup.is_for_block(parent_root))
{
warn!(block_root = ?awaiting_parent, "Ignoring child lookup parent lookup not found");
warn!(block_root = ?parent_root, "Ignoring child lookup parent lookup not found");
return false;
}
@@ -426,9 +514,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(
?peers,
?block_root,
awaiting_parent = awaiting_parent
.map(|root| root.to_string())
.unwrap_or("none".to_owned()),
?awaiting_parent,
id = lookup.id,
"Created block lookup"
);
@@ -559,6 +645,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockProcessType::SingleCustodyColumn(id) => {
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
}
BlockProcessType::SinglePayloadEnvelope { id, block_root } => {
let result = self
.on_processing_result_inner::<EnvelopeRequestState<T::EthSpec>>(id, result, cx);
// On successful envelope import, unblock child lookups waiting for this envelope
if matches!(&result, Ok(LookupResult::Completed)) {
self.continue_envelope_child_lookups(block_root, cx);
}
result
}
};
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
}
@@ -645,6 +740,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
request_state.revert_to_awaiting_processing()?;
Action::ParentUnknown { parent_root }
}
BlockError::ParentEnvelopeUnknown { parent_root } => {
// The parent block is known but its execution payload envelope is missing.
// Revert to awaiting processing and fetch the envelope via RPC.
request_state.revert_to_awaiting_processing()?;
Action::ParentEnvelopeUnknown { parent_root }
}
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
// and failed to validate the execution payload. Do not downscore peer.
@@ -667,6 +768,26 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// We opt to drop the lookup instead.
Action::Drop(format!("{e:?}"))
}
BlockError::PayloadEnvelopeError { e, penalize_peer } => {
debug!(
?block_root,
error = ?e,
"Payload envelope processing error"
);
if penalize_peer {
let peer_group = request_state.on_processing_failure()?;
for peer in peer_group.all() {
cx.report_peer(
*peer,
PeerAction::MidToleranceError,
"lookup_envelope_processing_failure",
);
}
Action::Retry
} else {
Action::Drop(format!("{e:?}"))
}
}
other => {
debug!(
?block_root,
@@ -701,6 +822,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
ResponseType::CustodyColumn => {
"lookup_custody_column_processing_failure"
}
ResponseType::Envelope => "lookup_envelope_processing_failure",
},
);
}
@@ -742,6 +864,25 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
)))
}
}
Action::ParentEnvelopeUnknown { parent_root } => {
let peers = lookup.all_peers();
lookup.set_awaiting_parent_envelope(parent_root);
let envelope_lookup_exists =
self.search_parent_envelope_of_child(parent_root, &peers, cx);
if envelope_lookup_exists {
debug!(
id = lookup_id,
?block_root,
?parent_root,
"Marking lookup as awaiting parent envelope"
);
Ok(LookupResult::Pending)
} else {
Err(LookupRequestError::Failed(format!(
"Envelope lookup could not be created for {parent_root:?}"
)))
}
}
Action::Drop(reason) => {
// Drop with noop
Err(LookupRequestError::Failed(reason))
@@ -791,7 +932,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self
for (id, lookup) in self.single_block_lookups.iter_mut() {
if lookup.awaiting_parent() == Some(block_root) {
if lookup.awaiting_parent_block() == Some(block_root) {
lookup.resolve_awaiting_parent();
debug!(
parent_root = ?block_root,
@@ -809,6 +950,33 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
/// Makes progress on lookups that were waiting for a parent envelope to be imported.
pub fn continue_envelope_child_lookups(
&mut self,
block_root: Hash256,
cx: &mut SyncNetworkContext<T>,
) {
let mut lookup_results = vec![];
for (id, lookup) in self.single_block_lookups.iter_mut() {
if lookup.awaiting_parent_envelope() == Some(block_root) {
lookup.resolve_awaiting_parent();
debug!(
envelope_root = ?block_root,
id,
block_root = ?lookup.block_root(),
"Continuing lookup after envelope imported"
);
let result = lookup.continue_requests(cx);
lookup_results.push((*id, result));
}
}
for (id, result) in lookup_results {
self.on_lookup_result(id, result, "continue_envelope_child_lookups", cx);
}
}
/// Drops `dropped_id` lookup and all its children recursively. Lookups awaiting a parent need
/// the parent to make progress to resolve, therefore we must drop them if the parent is
/// dropped.
@@ -824,10 +992,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[reason]);
self.metrics.dropped_lookups += 1;
let dropped_root = dropped_lookup.block_root();
let child_lookups = self
.single_block_lookups
.iter()
.filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root()))
.filter(|(_, lookup)| {
lookup.awaiting_parent_block() == Some(dropped_root)
|| lookup.awaiting_parent_envelope() == Some(dropped_root)
})
.map(|(id, _)| *id)
.collect::<Vec<_>>();
@@ -995,17 +1167,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&'a self,
lookup: &'a SingleBlockLookup<T>,
) -> Result<&'a SingleBlockLookup<T>, String> {
if let Some(awaiting_parent) = lookup.awaiting_parent() {
if let Some(parent_root) = lookup.awaiting_parent_block() {
if let Some(lookup) = self
.single_block_lookups
.values()
.find(|l| l.block_root() == awaiting_parent)
.find(|l| l.block_root() == parent_root)
{
self.find_oldest_ancestor_lookup(lookup)
} else {
Err(format!(
"Lookup references unknown parent {awaiting_parent:?}"
))
Err(format!("Lookup references unknown parent {parent_root:?}"))
}
} else {
Ok(lookup)
@@ -1038,7 +1208,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
if let Some(parent_root) = lookup.awaiting_parent() {
if let Some(parent_root) = lookup.awaiting_parent_block() {
if let Some((&child_id, _)) = self
.single_block_lookups
.iter()

View File

@@ -13,7 +13,7 @@ impl<T: BeaconChainTypes> From<&SingleBlockLookup<T>> for Node {
fn from(value: &SingleBlockLookup<T>) -> Self {
Self {
block_root: value.block_root(),
parent_root: value.awaiting_parent(),
parent_root: value.awaiting_parent_block(),
}
}
}

View File

@@ -16,7 +16,9 @@ use store::Hash256;
use strum::IntoStaticStr;
use tracing::{Span, debug_span};
use types::data::FixedBlobSidecarList;
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot};
use types::{
DataColumnSidecarList, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
};
// Dedicated enum for LookupResult to force its usage
#[must_use = "LookupResult must be handled with on_lookup_result"]
@@ -56,6 +58,14 @@ pub enum LookupRequestError {
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AwaitingParent {
/// Waiting for the parent block to be imported.
Block(Hash256),
/// The parent block is imported but its execution payload envelope is missing.
Envelope(Hash256),
}
#[derive(Educe)]
#[educe(Debug(bound(T: BeaconChainTypes)))]
pub struct SingleBlockLookup<T: BeaconChainTypes> {
@@ -69,7 +79,7 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
#[educe(Debug(method(fmt_peer_set_as_len)))]
peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
awaiting_parent: Option<Hash256>,
awaiting_parent: Option<AwaitingParent>,
created: Instant,
pub(crate) span: Span,
}
@@ -79,6 +89,7 @@ pub(crate) enum ComponentRequests<E: EthSpec> {
WaitingForBlock,
ActiveBlobRequest(BlobRequestState<E>, usize),
ActiveCustodyRequest(CustodyRequestState<E>),
ActiveEnvelopeRequest(EnvelopeRequestState<E>),
// When printing in debug this state display the reason why it's not needed
#[allow(dead_code)]
NotNeeded(&'static str),
@@ -89,7 +100,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
requested_block_root: Hash256,
peers: &[PeerId],
id: Id,
awaiting_parent: Option<Hash256>,
awaiting_parent: Option<AwaitingParent>,
) -> Self {
let lookup_span = debug_span!(
"lh_single_block_lookup",
@@ -109,10 +120,33 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
/// Create an envelope-only lookup. The block is already imported, we just need the envelope.
pub fn new_envelope_only(block_root: Hash256, peers: &[PeerId], id: Id) -> Self {
let mut lookup = Self::new(block_root, peers, id, None);
// Block is already imported, mark as completed
lookup
.block_request_state
.state
.on_completed_request("block already imported")
.expect("block state starts as AwaitingDownload");
lookup.component_requests =
ComponentRequests::ActiveEnvelopeRequest(EnvelopeRequestState::new(block_root));
lookup
}
/// Reset the status of all internal requests
pub fn reset_requests(&mut self) {
self.block_request_state = BlockRequestState::new(self.block_root);
self.component_requests = ComponentRequests::WaitingForBlock;
match &self.component_requests {
ComponentRequests::ActiveEnvelopeRequest(_) => {
self.component_requests = ComponentRequests::ActiveEnvelopeRequest(
EnvelopeRequestState::new(self.block_root),
);
}
_ => {
self.component_requests = ComponentRequests::WaitingForBlock;
}
}
}
/// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing`
@@ -128,18 +162,37 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.block_root
}
pub fn awaiting_parent(&self) -> Option<Hash256> {
pub fn awaiting_parent(&self) -> Option<AwaitingParent> {
self.awaiting_parent
}
/// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send
/// components for processing.
pub fn set_awaiting_parent(&mut self, parent_root: Hash256) {
self.awaiting_parent = Some(parent_root)
/// Returns the parent root if awaiting a parent block.
pub fn awaiting_parent_block(&self) -> Option<Hash256> {
match self.awaiting_parent {
Some(AwaitingParent::Block(root)) => Some(root),
_ => None,
}
}
/// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for
/// processing.
/// Returns the parent root if awaiting a parent envelope.
pub fn awaiting_parent_envelope(&self) -> Option<Hash256> {
match self.awaiting_parent {
Some(AwaitingParent::Envelope(root)) => Some(root),
_ => None,
}
}
/// Mark this lookup as awaiting a parent block to be imported before processing.
pub fn set_awaiting_parent(&mut self, parent_root: Hash256) {
self.awaiting_parent = Some(AwaitingParent::Block(parent_root));
}
/// Mark this lookup as awaiting a parent envelope to be imported before processing.
pub fn set_awaiting_parent_envelope(&mut self, parent_root: Hash256) {
self.awaiting_parent = Some(AwaitingParent::Envelope(parent_root));
}
/// Mark this lookup as no longer awaiting any parent.
pub fn resolve_awaiting_parent(&mut self) {
self.awaiting_parent = None;
}
@@ -180,6 +233,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
ComponentRequests::WaitingForBlock => false,
ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(),
ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(),
ComponentRequests::ActiveEnvelopeRequest(request) => request.state.is_processed(),
ComponentRequests::NotNeeded { .. } => true,
}
}
@@ -199,6 +253,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
ComponentRequests::ActiveCustodyRequest(request) => {
request.state.is_awaiting_event()
}
ComponentRequests::ActiveEnvelopeRequest(request) => {
request.state.is_awaiting_event()
}
ComponentRequests::NotNeeded { .. } => false,
}
}
@@ -268,6 +325,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
ComponentRequests::ActiveCustodyRequest(_) => {
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx, 0)?
}
ComponentRequests::ActiveEnvelopeRequest(_) => {
self.continue_request::<EnvelopeRequestState<T::EthSpec>>(cx, 0)?
}
ComponentRequests::NotNeeded { .. } => {} // do nothing
}
@@ -289,7 +349,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
expected_blobs: usize,
) -> Result<(), LookupRequestError> {
let id = self.id;
let awaiting_parent = self.awaiting_parent.is_some();
let awaiting_event = self.awaiting_parent.is_some();
let request =
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
@@ -333,7 +393,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// Otherwise, attempt to progress awaiting processing
// If this request is awaiting a parent lookup to be processed, do not send for processing.
// The request will be rejected with unknown parent error.
} else if !awaiting_parent {
} else if !awaiting_event {
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
// useful to conditionally access the result data.
if let Some(result) = request.get_state_mut().maybe_start_processing() {
@@ -429,6 +489,26 @@ impl<E: EthSpec> BlockRequestState<E> {
}
}
/// The state of the envelope request component of a `SingleBlockLookup`.
/// Used for envelope-only lookups where the parent block is already imported
/// but its execution payload envelope is missing.
#[derive(Educe)]
#[educe(Debug)]
pub struct EnvelopeRequestState<E: EthSpec> {
#[educe(Debug(ignore))]
pub block_root: Hash256,
pub state: SingleLookupRequestState<Arc<SignedExecutionPayloadEnvelope<E>>>,
}
impl<E: EthSpec> EnvelopeRequestState<E> {
pub fn new(block_root: Hash256) -> Self {
Self {
block_root,
state: SingleLookupRequestState::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct DownloadResult<T: Clone> {
pub value: T,

View File

@@ -45,6 +45,7 @@ use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::{
BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult,
EnvelopeRequestState,
};
use crate::sync::custody_backfill_sync::CustodyBackFillSync;
use crate::sync::network_context::{PeerGroup, RpcResponseResult};
@@ -65,7 +66,7 @@ use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::{PeerAction, PeerId};
use logging::crit;
use lru_cache::LRUTimeCache;
use slot_clock::SlotClock;
use slot_clock::{SlotClock, timestamp_now};
use std::ops::Sub;
use std::sync::Arc;
use std::time::Duration;
@@ -73,7 +74,8 @@ use strum::IntoStaticStr;
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace};
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot,
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock,
SignedExecutionPayloadEnvelope, Slot,
};
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
@@ -132,6 +134,14 @@ pub enum SyncMessage<E: EthSpec> {
seen_timestamp: Duration,
},
/// An execution payload envelope has been received from the RPC.
RpcPayloadEnvelope {
sync_request_id: SyncRequestId,
peer_id: PeerId,
envelope: Option<Arc<SignedExecutionPayloadEnvelope<E>>>,
seen_timestamp: Duration,
},
/// A block with an unknown parent has been received.
UnknownParentBlock(PeerId, Arc<SignedBeaconBlock<E>>, Hash256),
@@ -139,7 +149,11 @@ pub enum SyncMessage<E: EthSpec> {
UnknownParentBlob(PeerId, Arc<BlobSidecar<E>>),
/// A data column with an unknown parent has been received.
UnknownParentDataColumn(PeerId, Arc<DataColumnSidecar<E>>),
/// For Fulu columns, this signifies a missing parent, for Gloas, a missing block with the bid.
UnknownDataColumnParentOrBlock(PeerId, Arc<DataColumnSidecar<E>>),
/// A block's parent is known but its execution payload envelope has not been received yet.
UnknownParentEnvelope(PeerId, Arc<SignedBeaconBlock<E>>, Hash256),
/// A partial data column with an unknown parent has been received.
UnknownParentPartialDataColumn {
@@ -191,6 +205,7 @@ pub enum BlockProcessType {
SingleBlock { id: Id },
SingleBlob { id: Id },
SingleCustodyColumn(Id),
SinglePayloadEnvelope { id: Id, block_root: Hash256 },
}
impl BlockProcessType {
@@ -198,7 +213,8 @@ impl BlockProcessType {
match self {
BlockProcessType::SingleBlock { id }
| BlockProcessType::SingleBlob { id }
| BlockProcessType::SingleCustodyColumn(id) => *id,
| BlockProcessType::SingleCustodyColumn(id)
| BlockProcessType::SinglePayloadEnvelope { id, .. } => *id,
}
}
}
@@ -426,7 +442,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
//
// TODO: This fork-choice check is potentially duplicated, review code
if !self.chain.block_is_known_to_fork_choice(&remote.head_root) {
self.handle_unknown_block_root(peer_id, remote.head_root);
self.handle_unknown_block_root(peer_id, remote.head_root, None);
}
}
}
@@ -512,6 +528,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncRequestId::DataColumnsByRange(req_id) => {
self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::SinglePayloadEnvelope { id } => {
self.on_single_envelope_response(id, peer_id, RpcEvent::RPCError(error))
}
}
}
@@ -846,6 +865,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} => {
self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp)
}
SyncMessage::RpcPayloadEnvelope {
sync_request_id,
peer_id,
envelope,
seen_timestamp,
} => self.rpc_payload_envelope_received(
sync_request_id,
peer_id,
envelope,
seen_timestamp,
),
SyncMessage::UnknownParentBlock(peer_id, block, block_root) => {
let block_slot = block.slot();
let parent_root = block.parent_root();
@@ -881,7 +911,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}),
);
}
SyncMessage::UnknownParentDataColumn(peer_id, data_column) => {
SyncMessage::UnknownDataColumnParentOrBlock(peer_id, data_column) => {
let data_column_slot = data_column.slot();
let block_root = data_column.block_root();
match data_column.as_ref() {
@@ -905,12 +935,46 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}),
);
}
// TODO(gloas) support gloas data column variant
DataColumnSidecar::Gloas(_) => {
error!("Gloas variant not yet supported")
debug!(%block_root, "Received unknown block data column message");
self.handle_unknown_block_root(
peer_id,
block_root,
Some(BlockComponent::DataColumn(DownloadResult {
value: block_root,
block_root,
seen_timestamp: self
.chain
.slot_clock
.now_duration()
.unwrap_or_default(),
peer_group: PeerGroup::from_single(peer_id),
})),
);
}
}
}
SyncMessage::UnknownParentEnvelope(peer_id, block, block_root) => {
let block_slot = block.slot();
let parent_root = block.parent_root();
debug!(
%block_root,
%parent_root,
"Parent envelope not yet available, creating envelope lookup"
);
self.handle_unknown_parent_envelope(
peer_id,
block_root,
parent_root,
block_slot,
BlockComponent::Block(DownloadResult {
value: block.block_cloned(),
block_root,
seen_timestamp: timestamp_now(),
peer_group: PeerGroup::from_single(peer_id),
}),
);
}
SyncMessage::UnknownParentPartialDataColumn {
peer_id,
block_root,
@@ -935,7 +999,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if !self.notified_unknown_roots.contains(&(peer_id, block_root)) {
self.notified_unknown_roots.insert((peer_id, block_root));
debug!(?block_root, ?peer_id, "Received unknown block hash message");
self.handle_unknown_block_root(peer_id, block_root);
self.handle_unknown_block_root(peer_id, block_root, None);
}
}
SyncMessage::Disconnect(peer_id) => {
@@ -1036,11 +1100,51 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
fn handle_unknown_block_root(&mut self, peer_id: PeerId, block_root: Hash256) {
/// Handle a block whose parent block is known but parent envelope is missing.
/// Creates an envelope-only lookup for the parent and a child lookup that waits for it.
fn handle_unknown_parent_envelope(
&mut self,
peer_id: PeerId,
block_root: Hash256,
parent_root: Hash256,
slot: Slot,
block_component: BlockComponent<T::EthSpec>,
) {
match self.should_search_for_block(Some(slot), &peer_id) {
Ok(_) => {
if self.block_lookups.search_child_and_parent_envelope(
block_root,
block_component,
parent_root,
peer_id,
&mut self.network,
) {
// Lookups created
} else {
debug!(
?block_root,
?parent_root,
"No lookup created for child and parent envelope"
);
}
}
Err(reason) => {
debug!(%block_root, %parent_root, reason, "Ignoring unknown parent envelope request");
}
}
}
fn handle_unknown_block_root(
&mut self,
peer_id: PeerId,
block_root: Hash256,
block_component: Option<BlockComponent<T::EthSpec>>,
) {
match self.should_search_for_block(None, &peer_id) {
Ok(_) => {
if self.block_lookups.search_unknown_block(
block_root,
block_component,
&[peer_id],
&mut self.network,
) {
@@ -1231,6 +1335,46 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
fn rpc_payload_envelope_received(
&mut self,
sync_request_id: SyncRequestId,
peer_id: PeerId,
envelope: Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
seen_timestamp: Duration,
) {
match sync_request_id {
SyncRequestId::SinglePayloadEnvelope { id } => self.on_single_envelope_response(
id,
peer_id,
RpcEvent::from_chunk(envelope, seen_timestamp),
),
_ => {
crit!(%peer_id, "bad request id for payload envelope");
}
}
}
fn on_single_envelope_response(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) {
if let Some(resp) = self
.network
.on_single_envelope_response(id, peer_id, rpc_event)
{
self.block_lookups
.on_download_response::<EnvelopeRequestState<T::EthSpec>>(
id,
resp.map(|(value, seen_timestamp)| {
(value, PeerGroup::from_single(peer_id), seen_timestamp)
}),
&mut self.network,
)
}
}
fn on_single_blob_response(
&mut self,
id: SingleLookupReqId,

View File

@@ -37,6 +37,7 @@ pub use requests::LookupVerifyError;
use requests::{
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest,
};
#[cfg(test)]
use slot_clock::SlotClock;
@@ -52,7 +53,7 @@ use tracing::{Span, debug, debug_span, error, warn};
use types::data::FixedBlobSidecarList;
use types::{
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
ForkContext, Hash256, SignedBeaconBlock, Slot,
ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
};
pub mod custody;
@@ -213,6 +214,9 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// A mapping of active DataColumnsByRange requests
data_columns_by_range_requests:
ActiveRequests<DataColumnsByRangeRequestId, DataColumnsByRangeRequestItems<T::EthSpec>>,
/// A mapping of active PayloadEnvelopesByRoot requests
payload_envelopes_by_root_requests:
ActiveRequests<SingleLookupReqId, PayloadEnvelopesByRootRequestItems<T::EthSpec>>,
/// Mapping of active custody column requests for a block root
custody_by_root_requests: FnvHashMap<CustodyRequester, ActiveCustodyRequest<T>>,
@@ -298,6 +302,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blocks_by_range_requests: ActiveRequests::new("blocks_by_range"),
blobs_by_range_requests: ActiveRequests::new("blobs_by_range"),
data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"),
payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"),
custody_by_root_requests: <_>::default(),
components_by_range_requests: FnvHashMap::default(),
custody_backfill_data_column_batch_requests: FnvHashMap::default(),
@@ -326,6 +331,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blocks_by_range_requests,
blobs_by_range_requests,
data_columns_by_range_requests,
payload_envelopes_by_root_requests,
// custody_by_root_requests is a meta request of data_columns_by_root_requests
custody_by_root_requests: _,
// components_by_range_requests is a meta request of various _by_range requests
@@ -361,12 +367,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.active_requests_of_peer(peer_id)
.into_iter()
.map(|req_id| SyncRequestId::DataColumnsByRange(*req_id));
let envelope_by_root_ids = payload_envelopes_by_root_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|id| SyncRequestId::SinglePayloadEnvelope { id: *id });
blocks_by_root_ids
.chain(blobs_by_root_ids)
.chain(data_column_by_root_ids)
.chain(blocks_by_range_ids)
.chain(blobs_by_range_ids)
.chain(data_column_by_range_ids)
.chain(envelope_by_root_ids)
.collect()
}
@@ -423,6 +434,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blocks_by_range_requests,
blobs_by_range_requests,
data_columns_by_range_requests,
payload_envelopes_by_root_requests,
// custody_by_root_requests is a meta request of data_columns_by_root_requests
custody_by_root_requests: _,
// components_by_range_requests is a meta request of various _by_range requests
@@ -445,6 +457,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.chain(blocks_by_range_requests.iter_request_peers())
.chain(blobs_by_range_requests.iter_request_peers())
.chain(data_columns_by_range_requests.iter_request_peers())
.chain(payload_envelopes_by_root_requests.iter_request_peers())
{
*active_request_count_by_peer.entry(peer_id).or_default() += 1;
}
@@ -927,6 +940,74 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(LookupRequestResult::RequestSent(id.req_id))
}
/// Request a payload envelope for `block_root` from a peer.
pub fn envelope_lookup_request(
&mut self,
lookup_id: SingleLookupId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let active_request_count_by_peer = self.active_request_count_by_peer();
let Some(peer_id) = lookup_peers
.read()
.iter()
.map(|peer| {
(
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
rand::random::<u32>(),
peer,
)
})
.min()
.map(|(_, _, peer)| *peer)
else {
return Ok(LookupRequestResult::Pending("no peers"));
};
let id = SingleLookupReqId {
lookup_id,
req_id: self.next_id(),
};
let request = PayloadEnvelopesByRootSingleRequest(block_root);
let network_request = RequestType::PayloadEnvelopesByRoot(
request
.into_request(&self.fork_context)
.map_err(RpcRequestSendError::InternalError)?,
);
self.network_send
.send(NetworkMessage::SendRequest {
peer_id,
request: network_request,
app_request_id: AppRequestId::Sync(SyncRequestId::SinglePayloadEnvelope { id }),
})
.map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "PayloadEnvelopesByRoot",
?block_root,
peer = %peer_id,
%id,
"Sync RPC request sent"
);
let request_span = debug_span!(
parent: Span::current(),
"lh_outgoing_envelope_by_root_request",
%block_root,
);
self.payload_envelopes_by_root_requests.insert(
id,
peer_id,
true,
PayloadEnvelopesByRootRequestItems::new(request),
request_span,
);
Ok(LookupRequestResult::RequestSent(id.req_id))
}
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
/// - If we have a downloaded but not yet processed block
/// - If the da_checker has a pending block
@@ -1435,6 +1516,27 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.on_rpc_response_result(resp, peer_id)
}
pub(crate) fn on_single_envelope_response(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) -> Option<RpcResponseResult<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>> {
let resp = self
.payload_envelopes_by_root_requests
.on_response(id, rpc_event);
let resp = resp.map(|res| {
res.and_then(|(mut envelopes, seen_timestamp)| {
match envelopes.pop() {
Some(envelope) => Ok((envelope, seen_timestamp)),
// Should never happen, request items enforces at least 1 chunk.
None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()),
}
})
});
self.on_rpc_response_result(resp, peer_id)
}
pub(crate) fn on_single_blob_response(
&mut self,
id: SingleLookupReqId,
@@ -1610,6 +1712,33 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
})
}
pub fn send_envelope_for_processing(
&self,
id: Id,
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
seen_timestamp: Duration,
block_root: Hash256,
) -> Result<(), SendErrorProcessor> {
let beacon_processor = self
.beacon_processor_if_enabled()
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
debug!(?block_root, ?id, "Sending payload envelope for processing");
beacon_processor
.send_rpc_payload_envelope(
envelope,
seen_timestamp,
BlockProcessType::SinglePayloadEnvelope { id, block_root },
)
.map_err(|e| {
error!(
error = ?e,
"Failed to send sync envelope to processor"
);
SendErrorProcessor::SendError
})
}
pub fn send_blobs_for_processing(
&self,
id: Id,
@@ -1788,6 +1917,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
"data_columns_by_range",
self.data_columns_by_range_requests.len(),
),
(
"payload_envelopes_by_root",
self.payload_envelopes_by_root_requests.len(),
),
("custody_by_root", self.custody_by_root_requests.len()),
(
"components_by_range",

View File

@@ -16,6 +16,9 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems;
pub use data_columns_by_root::{
DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest,
};
pub use payload_envelopes_by_root::{
PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest,
};
use crate::metrics;
@@ -27,6 +30,7 @@ mod blocks_by_range;
mod blocks_by_root;
mod data_columns_by_range;
mod data_columns_by_root;
mod payload_envelopes_by_root;
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum LookupVerifyError {

View File

@@ -0,0 +1,53 @@
use lighthouse_network::rpc::methods::PayloadEnvelopesByRootRequest;
use std::sync::Arc;
use types::{EthSpec, ForkContext, Hash256, SignedExecutionPayloadEnvelope};
use super::{ActiveRequestItems, LookupVerifyError};
#[derive(Debug, Copy, Clone)]
pub struct PayloadEnvelopesByRootSingleRequest(pub Hash256);
impl PayloadEnvelopesByRootSingleRequest {
pub fn into_request(
self,
fork_context: &ForkContext,
) -> Result<PayloadEnvelopesByRootRequest, String> {
PayloadEnvelopesByRootRequest::new(vec![self.0], fork_context)
}
}
pub struct PayloadEnvelopesByRootRequestItems<E: EthSpec> {
request: PayloadEnvelopesByRootSingleRequest,
items: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
}
impl<E: EthSpec> PayloadEnvelopesByRootRequestItems<E> {
pub fn new(request: PayloadEnvelopesByRootSingleRequest) -> Self {
Self {
request,
items: vec![],
}
}
}
impl<E: EthSpec> ActiveRequestItems for PayloadEnvelopesByRootRequestItems<E> {
type Item = Arc<SignedExecutionPayloadEnvelope<E>>;
/// Append a response to the single chunk request. If the chunk is valid, the request is
/// resolved immediately.
/// The active request SHOULD be dropped after `add_response` returns an error
fn add(&mut self, envelope: Self::Item) -> Result<bool, LookupVerifyError> {
let beacon_block_root = envelope.beacon_block_root();
if self.request.0 != beacon_block_root {
return Err(LookupVerifyError::UnrequestedBlockRoot(beacon_block_root));
}
self.items.push(envelope);
// Always returns true, payload envelopes by root expects a single response
Ok(true)
}
fn consume(&mut self) -> Vec<Self::Item> {
std::mem::take(&mut self.items)
}
}

View File

@@ -1474,7 +1474,7 @@ impl TestRig {
peer_id: PeerId,
column: Arc<DataColumnSidecar<E>>,
) {
self.send_sync_message(SyncMessage::UnknownParentDataColumn(peer_id, column));
self.send_sync_message(SyncMessage::UnknownDataColumnParentOrBlock(peer_id, column));
}
fn trigger_unknown_block_from_attestation(&mut self, block_root: Hash256, peer_id: PeerId) {

View File

@@ -416,11 +416,24 @@ where
let (execution_status, execution_payload_parent_hash, execution_payload_block_hash) =
if let Ok(signed_bid) = anchor_block.message().body().signed_execution_payload_bid() {
// Gloas: execution status is irrelevant post-Gloas; payload validation
// is decoupled from beacon blocks.
// At Gloas genesis the block bid is empty (all zeros) per spec, but the
// state holds the EL genesis hash in `latest_block_hash`. Use it so the
// first forkchoice update sends a valid head to the EL.
let parent_hash = if anchor_block.slot() == spec.genesis_slot
&& anchor_state.slot() == spec.genesis_slot
&& signed_bid.message.parent_block_hash.into_root().is_zero()
&& signed_bid.message.block_hash.into_root().is_zero()
{
*anchor_state
.latest_block_hash()
.map_err(Error::BeaconStateError)?
} else {
signed_bid.message.parent_block_hash
};
(
ExecutionStatus::irrelevant(),
Some(signed_bid.message.parent_block_hash),
Some(parent_hash),
Some(signed_bid.message.block_hash),
)
} else if let Ok(execution_payload) = anchor_block.message().execution_payload() {
@@ -1507,6 +1520,11 @@ where
}
}
/// Returns whether the payload envelope has been received for the given block.
pub fn is_payload_received(&self, block_root: &Hash256) -> bool {
self.proto_array.is_payload_received(block_root)
}
/// Returns whether the proposer should extend the execution payload chain of the given block.
pub fn should_extend_payload(&self, block_root: &Hash256) -> Result<bool, Error<T::Error>> {
let proposer_boost_root = self.fc_store.proposer_boost_root();

View File

@@ -167,21 +167,12 @@ pub fn initialize_beacon_state_from_eth1<E: EthSpec>(
// Remove intermediate Fulu fork from `state.fork`.
state.fork_mut().previous_version = spec.gloas_fork_version;
// The genesis block's bid must have block_hash = 0x00 per spec (empty payload).
// Retain the EL genesis hash in latest_block_hash and parent_block_hash so the
// first post-genesis proposer can build on the correct EL head.
let el_genesis_hash = state.latest_execution_payload_bid()?.block_hash;
let bid = state.latest_execution_payload_bid_mut()?;
bid.parent_block_hash = el_genesis_hash;
bid.block_hash = ExecutionBlockHash::default();
// Update latest_block_header to reflect the Gloas genesis block body which contains
// the EL genesis hash in the signed_execution_payload_bid. This is needed because
// BeaconState::new() created the header from BeaconBlock::empty() which has zero bid
// fields, but the spec requires the genesis block's bid to contain the EL block hash
// and the tree hash root of empty ExecutionRequests.
let block = genesis_block(&state, spec)?;
state.latest_block_header_mut().body_root = block.body_root();
}
// Now that we have our validators, initialize the caches (including the committees)
@@ -193,26 +184,13 @@ pub fn initialize_beacon_state_from_eth1<E: EthSpec>(
Ok(state)
}
/// Create an unsigned genesis `BeaconBlock` whose body matches the genesis state.
/// Create an unsigned genesis `BeaconBlock`.
///
/// For Gloas, the block's `signed_execution_payload_bid` is populated from the state's
/// `latest_execution_payload_bid` so that the body root is consistent with
/// `state.latest_block_header.body_root`.
///
/// The returned block has `state_root == Hash256::ZERO`; callers that need the real
/// state root should set it themselves.
pub fn genesis_block<E: EthSpec>(
genesis_state: &BeaconState<E>,
spec: &ChainSpec,
) -> Result<BeaconBlock<E>, BeaconStateError> {
let mut block = BeaconBlock::empty(spec);
if let Ok(block) = block.as_gloas_mut() {
let state_bid = genesis_state.latest_execution_payload_bid()?;
let bid = &mut block.body.signed_execution_payload_bid.message;
bid.block_hash = state_bid.block_hash;
bid.execution_requests_root = state_bid.execution_requests_root;
}
Ok(block)
/// Per spec, the genesis block body is empty (all default fields).
/// `state.latest_block_header.body_root` is set from `BeaconBlock::empty()`,
/// so this function must return the same empty block to keep roots consistent.
pub fn genesis_block<E: EthSpec>(spec: &ChainSpec) -> Result<BeaconBlock<E>, BeaconStateError> {
Ok(BeaconBlock::empty(spec))
}
/// Determine whether a candidate genesis state is suitable for starting the chain.

View File

@@ -553,7 +553,6 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
beacon_nodes.clone(),
context.executor.clone(),
);
let payload_attestation_service = PayloadAttestationService::new(
duties_service.clone(),
validator_store.clone(),

View File

@@ -189,6 +189,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationServ
}
let count = messages.len();
let fork_name = self.chain_spec.fork_name_at_slot::<S::E>(slot);
let result = self
.beacon_nodes