mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-30 20:57:10 +00:00
merge conflicts
This commit is contained in:
@@ -942,12 +942,18 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
|
||||
|
||||
// Check that we've received the parent envelope. If not, issue a single envelope
|
||||
// lookup for the parent and queue this block in the reprocess queue.
|
||||
//
|
||||
// The anchor block (proto-array root) is implicitly considered to have its payload
|
||||
// received: there is no envelope to fetch for the anchor (per spec, the anchor is
|
||||
// never added to `store.payloads`), and the anchor is trusted by definition.
|
||||
let parent_is_gloas = chain
|
||||
.spec
|
||||
.fork_name_at_slot::<T::EthSpec>(parent_block.slot)
|
||||
.gloas_enabled();
|
||||
let parent_is_anchor = parent_block.parent_root.is_none();
|
||||
|
||||
if parent_is_gloas
|
||||
&& !parent_is_anchor
|
||||
&& !fork_choice_read_lock.is_payload_received(&block.message().parent_root())
|
||||
{
|
||||
return Err(BlockError::ParentEnvelopeUnknown {
|
||||
|
||||
@@ -14,7 +14,8 @@ use super::{
|
||||
};
|
||||
use crate::{
|
||||
AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes,
|
||||
NotifyExecutionLayer, block_verification_types::AvailableBlockData, metrics,
|
||||
NotifyExecutionLayer, block_verification::PayloadVerificationOutcome,
|
||||
block_verification_types::AvailableBlockData, metrics,
|
||||
payload_envelope_verification::ExecutionPendingEnvelope, validator_monitor::get_slot_delay_ms,
|
||||
};
|
||||
|
||||
@@ -99,9 +100,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
self.import_available_execution_payload_envelope(Box::new(envelope))
|
||||
.await
|
||||
}
|
||||
ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError(
|
||||
"Pending payload envelope not yet implemented".to_owned(),
|
||||
)),
|
||||
ExecutedEnvelope::AvailabilityPending {
|
||||
signed_envelope,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
} => {
|
||||
self.import_pending_execution_payload_envelope(
|
||||
signed_envelope,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -186,6 +196,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
))
|
||||
}
|
||||
|
||||
/// Import an envelope whose data column availability has not yet been satisfied.
|
||||
///
|
||||
/// Marks the block's payload as received in fork choice and persists the envelope to the
|
||||
/// store, but does not write data column ops. Columns are expected to arrive separately
|
||||
/// (gossip, engineGetBlobs, or reconstruction).
|
||||
#[instrument(skip_all)]
|
||||
pub async fn import_pending_execution_payload_envelope(
|
||||
self: &Arc<Self>,
|
||||
signed_envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
import_data: EnvelopeImportData<T::EthSpec>,
|
||||
payload_verification_outcome: PayloadVerificationOutcome,
|
||||
) -> Result<AvailabilityProcessingStatus, EnvelopeError> {
|
||||
let EnvelopeImportData {
|
||||
block_root,
|
||||
_phantom,
|
||||
} = import_data;
|
||||
let block_root = {
|
||||
let chain = self.clone();
|
||||
self.spawn_blocking_handle(
|
||||
move || {
|
||||
chain.import_execution_payload_envelope_pending_columns(
|
||||
signed_envelope,
|
||||
block_root,
|
||||
payload_verification_outcome.payload_verification_status,
|
||||
)
|
||||
},
|
||||
"payload_verification_handle",
|
||||
)
|
||||
.await??
|
||||
};
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root))
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn import_available_execution_payload_envelope(
|
||||
self: &Arc<Self>,
|
||||
@@ -220,6 +263,50 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root))
|
||||
}
|
||||
|
||||
/// Same as `import_execution_payload_envelope` but for envelopes whose data columns
|
||||
/// have not yet been received. Marks the payload as received in fork choice and
|
||||
/// persists the envelope; columns are persisted separately as they arrive.
|
||||
#[instrument(skip_all)]
|
||||
fn import_execution_payload_envelope_pending_columns(
|
||||
&self,
|
||||
signed_envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
block_root: Hash256,
|
||||
payload_verification_status: PayloadVerificationStatus,
|
||||
) -> Result<Hash256, EnvelopeError> {
|
||||
let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock();
|
||||
if !fork_choice_reader.contains_block(&block_root) {
|
||||
return Err(EnvelopeError::BlockRootUnknown { block_root });
|
||||
}
|
||||
|
||||
let mut fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader);
|
||||
fork_choice
|
||||
.on_valid_payload_envelope_received(block_root)
|
||||
.map_err(|e| EnvelopeError::InternalError(format!("{e:?}")))?;
|
||||
|
||||
let db_write_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_DB_WRITE);
|
||||
let ops = vec![StoreOp::PutPayloadEnvelope(
|
||||
block_root,
|
||||
signed_envelope.clone(),
|
||||
)];
|
||||
let db_span = info_span!("persist_envelope_pending_columns").entered();
|
||||
if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
|
||||
error!(error = ?e, "Database write failed for pending-columns envelope");
|
||||
return Err(e.into());
|
||||
}
|
||||
drop(db_span);
|
||||
drop(fork_choice);
|
||||
|
||||
let envelope_time_imported = self.slot_clock.now_duration().unwrap_or(Duration::MAX);
|
||||
metrics::stop_timer(db_write_timer);
|
||||
self.import_envelope_update_metrics_and_events(
|
||||
signed_envelope,
|
||||
block_root,
|
||||
payload_verification_status,
|
||||
envelope_time_imported,
|
||||
);
|
||||
Ok(block_root)
|
||||
}
|
||||
|
||||
/// Accepts a fully-verified and available envelope and imports it into the chain without performing any
|
||||
/// additional verification.
|
||||
///
|
||||
|
||||
@@ -119,12 +119,16 @@ pub struct EnvelopeProcessingSnapshot<E: EthSpec> {
|
||||
/// 1. `Available`: This envelope has been executed and also contains all data to consider it
|
||||
/// fully available.
|
||||
/// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it
|
||||
/// fully available.
|
||||
#[allow(dead_code)]
|
||||
/// fully available. The envelope is still imported (fork-choice marks the block's payload
|
||||
/// as received and the envelope is persisted); column persistence is handled separately
|
||||
/// via gossip / engineGetBlobs as columns arrive.
|
||||
pub enum ExecutedEnvelope<E: EthSpec> {
|
||||
Available(AvailableExecutedEnvelope<E>),
|
||||
// TODO(gloas): check data column availability via DA checker
|
||||
AvailabilityPending(),
|
||||
AvailabilityPending {
|
||||
signed_envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
|
||||
import_data: EnvelopeImportData<E>,
|
||||
payload_verification_outcome: PayloadVerificationOutcome,
|
||||
},
|
||||
}
|
||||
|
||||
impl<E: EthSpec> ExecutedEnvelope<E> {
|
||||
@@ -142,15 +146,14 @@ impl<E: EthSpec> ExecutedEnvelope<E> {
|
||||
payload_verification_outcome,
|
||||
))
|
||||
}
|
||||
// TODO(gloas): check data column availability via DA checker
|
||||
MaybeAvailableEnvelope::AvailabilityPending {
|
||||
block_hash,
|
||||
envelope,
|
||||
} => Self::Available(AvailableExecutedEnvelope::new(
|
||||
AvailableEnvelope::new(block_hash, envelope, vec![], None, spec),
|
||||
block_hash: _,
|
||||
envelope: signed_envelope,
|
||||
} => Self::AvailabilityPending {
|
||||
signed_envelope,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
)),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3978,8 +3978,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
// register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope");
|
||||
|
||||
match &result {
|
||||
Ok(AvailabilityProcessingStatus::Imported(_))
|
||||
| Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
|
||||
// Notify sync so any pending child lookup awaiting this parent envelope unblocks.
|
||||
self.send_sync_message(SyncMessage::GossipEnvelopeImported {
|
||||
block_root: *block_root,
|
||||
});
|
||||
}
|
||||
Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {
|
||||
// Nothing to do
|
||||
}
|
||||
Err(e) => match e {
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::{
|
||||
sync::{SyncMessage, manager::BlockProcessType},
|
||||
};
|
||||
use beacon_chain::block_verification_types::LookupBlock;
|
||||
use beacon_chain::chain_config::ChainConfig;
|
||||
use beacon_chain::custody_context::NodeCustodyType;
|
||||
use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip_fulu;
|
||||
use beacon_chain::kzg_utils::blobs_to_data_column_sidecars;
|
||||
@@ -134,7 +135,10 @@ impl TestRig {
|
||||
.fresh_ephemeral_store()
|
||||
.mock_execution_layer()
|
||||
.node_custody_type(NodeCustodyType::Fullnode)
|
||||
.chain_config(<_>::default())
|
||||
.chain_config(ChainConfig {
|
||||
disable_get_blobs: true,
|
||||
..ChainConfig::default()
|
||||
})
|
||||
.build();
|
||||
|
||||
harness.advance_slot();
|
||||
@@ -169,7 +173,10 @@ impl TestRig {
|
||||
.fresh_ephemeral_store()
|
||||
.mock_execution_layer()
|
||||
.node_custody_type(node_custody_type)
|
||||
.chain_config(<_>::default())
|
||||
.chain_config(ChainConfig {
|
||||
disable_get_blobs: true,
|
||||
..ChainConfig::default()
|
||||
})
|
||||
.build();
|
||||
|
||||
harness.advance_slot();
|
||||
@@ -1001,14 +1008,30 @@ async fn data_column_reconstruction_at_deadline() {
|
||||
rig.enqueue_gossip_data_columns(i);
|
||||
}
|
||||
|
||||
// Expect all gossip events + reconstruction
|
||||
let mut expected_events: Vec<WorkType> = (0..min_columns_for_reconstruction)
|
||||
.map(|_| WorkType::GossipDataColumnSidecar)
|
||||
.collect();
|
||||
expected_events.push(WorkType::ColumnReconstruction);
|
||||
|
||||
rig.assert_event_journal_contains_ordered(&expected_events)
|
||||
.await;
|
||||
// Drain the journal until we've seen all gossip events plus at least one
|
||||
// reconstruction. Under real crypto the reprocess queue can dispatch the
|
||||
// reconstruction work item more than once (the second is a no-op via
|
||||
// `reconstruction_started`), so we don't pin the count — we just require >= 1.
|
||||
let gsc: &str = WorkType::GossipDataColumnSidecar.into();
|
||||
let cr: &str = WorkType::ColumnReconstruction.into();
|
||||
let (mut gossip_seen, mut recon_seen) = (0usize, 0usize);
|
||||
let drain = async {
|
||||
while let Some(event) = rig.work_journal_rx.recv().await {
|
||||
if event == gsc {
|
||||
gossip_seen += 1;
|
||||
} else if event == cr {
|
||||
recon_seen += 1;
|
||||
}
|
||||
if gossip_seen == min_columns_for_reconstruction && recon_seen >= 1 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
if tokio::time::timeout(STANDARD_TIMEOUT, drain).await.is_err() {
|
||||
panic!("timeout: gossip_seen={gossip_seen}, recon_seen={recon_seen}");
|
||||
}
|
||||
assert_eq!(gossip_seen, min_columns_for_reconstruction);
|
||||
assert!(recon_seen >= 1);
|
||||
}
|
||||
|
||||
// Test the column reconstruction is delayed for columns that arrive for a previous slot.
|
||||
|
||||
@@ -154,6 +154,10 @@ pub enum SyncMessage<E: EthSpec> {
|
||||
/// A block's parent is known but its execution payload envelope has not been received yet.
|
||||
UnknownParentEnvelope(PeerId, Arc<SignedBeaconBlock<E>>, Hash256),
|
||||
|
||||
/// An execution payload envelope has been imported via the local gossip path.
|
||||
/// Sync uses this to unblock any child lookups that were awaiting this parent envelope.
|
||||
GossipEnvelopeImported { block_root: Hash256 },
|
||||
|
||||
/// A partial data column with an unknown parent has been received.
|
||||
UnknownParentPartialDataColumn {
|
||||
peer_id: PeerId,
|
||||
@@ -961,6 +965,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
}),
|
||||
);
|
||||
}
|
||||
SyncMessage::GossipEnvelopeImported { block_root } => {
|
||||
debug!(
|
||||
%block_root,
|
||||
"Gossip-imported envelope; unblocking awaiting child lookups"
|
||||
);
|
||||
self.block_lookups
|
||||
.continue_envelope_child_lookups(block_root, &mut self.network);
|
||||
}
|
||||
SyncMessage::UnknownParentPartialDataColumn {
|
||||
peer_id,
|
||||
block_root,
|
||||
@@ -1096,6 +1108,21 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
slot: Slot,
|
||||
block_component: BlockComponent<T::EthSpec>,
|
||||
) {
|
||||
// Defensive: if the parent's payload envelope was already received between when
|
||||
// gossip-verification raised `ParentEnvelopeUnknown` and now, no lookup is needed.
|
||||
if self
|
||||
.chain
|
||||
.canonical_head
|
||||
.fork_choice_read_lock()
|
||||
.is_payload_received(&parent_root)
|
||||
{
|
||||
debug!(
|
||||
%block_root,
|
||||
%parent_root,
|
||||
"Parent envelope already received, skipping envelope lookup"
|
||||
);
|
||||
return;
|
||||
}
|
||||
match self.should_search_for_block(Some(slot), &peer_id) {
|
||||
Ok(_) => {
|
||||
if self.block_lookups.search_child_and_parent_envelope(
|
||||
|
||||
@@ -37,7 +37,7 @@ use tokio::sync::mpsc;
|
||||
use tracing::info;
|
||||
use types::{
|
||||
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, EthSpec, ForkContext, ForkName,
|
||||
Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot,
|
||||
Hash256, MinimalEthSpec as E, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
|
||||
test_utils::{SeedableRng, XorShiftRng},
|
||||
};
|
||||
|
||||
@@ -85,6 +85,9 @@ pub struct SimulateConfig {
|
||||
ee_offline_for_n_range_responses: Option<usize>,
|
||||
/// Disconnect all peers after this many successful BlocksByRange responses.
|
||||
successful_range_responses_before_disconnect: Option<usize>,
|
||||
/// Number of `PayloadEnvelopesByRoot` responses that return an envelope for a
|
||||
/// different block_root than requested.
|
||||
return_wrong_envelopes_n_times: usize,
|
||||
}
|
||||
|
||||
impl SimulateConfig {
|
||||
@@ -116,6 +119,11 @@ impl SimulateConfig {
|
||||
self
|
||||
}
|
||||
|
||||
fn return_wrong_envelope_once(mut self) -> Self {
|
||||
self.return_wrong_envelopes_n_times = 1;
|
||||
self
|
||||
}
|
||||
|
||||
fn return_wrong_sidecar_for_block_once(mut self) -> Self {
|
||||
self.return_wrong_sidecar_for_block_n_times = 1;
|
||||
self
|
||||
@@ -209,6 +217,9 @@ pub(crate) struct TestRigConfig {
|
||||
fulu_test_type: FuluTestType,
|
||||
/// Override the node custody type derived from `fulu_test_type`
|
||||
node_custody_type_override: Option<NodeCustodyType>,
|
||||
/// Override the number of validators in the harness genesis state. Defaults to 1.
|
||||
/// Some forks (e.g. Gloas) cannot initialise a state with a single validator.
|
||||
validator_count_override: Option<usize>,
|
||||
}
|
||||
|
||||
impl TestRig {
|
||||
@@ -222,9 +233,9 @@ impl TestRig {
|
||||
);
|
||||
|
||||
// Initialise a new beacon chain
|
||||
let harness = BeaconChainHarness::<EphemeralHarnessType<E>>::builder(E)
|
||||
let mut builder = BeaconChainHarness::<EphemeralHarnessType<E>>::builder(E)
|
||||
.spec(spec.clone())
|
||||
.deterministic_keypairs(1)
|
||||
.deterministic_keypairs(test_rig_config.validator_count_override.unwrap_or(1))
|
||||
.fresh_ephemeral_store()
|
||||
.mock_execution_layer()
|
||||
.testing_slot_clock(clock.clone())
|
||||
@@ -232,8 +243,17 @@ impl TestRig {
|
||||
test_rig_config
|
||||
.node_custody_type_override
|
||||
.unwrap_or_else(|| test_rig_config.fulu_test_type.we_node_custody_type()),
|
||||
)
|
||||
.build();
|
||||
);
|
||||
// Post-Electra forks need validators with effective balance close to
|
||||
// `max_effective_balance_electra` for balance-weighted committee
|
||||
// selection (sync committee, PTC) to converge during genesis.
|
||||
if spec.electra_fork_epoch == Some(types::Epoch::new(0)) {
|
||||
let max_eb = spec.max_effective_balance_electra;
|
||||
builder = builder.with_genesis_state_builder(move |b| {
|
||||
b.set_initial_balance_fn(Box::new(move |_| max_eb))
|
||||
});
|
||||
}
|
||||
let harness = builder.build();
|
||||
|
||||
let chain = harness.chain.clone();
|
||||
let fork_context = Arc::new(ForkContext::new::<E>(
|
||||
@@ -305,6 +325,7 @@ impl TestRig {
|
||||
fork_name,
|
||||
network_blocks_by_root: <_>::default(),
|
||||
network_blocks_by_slot: <_>::default(),
|
||||
network_envelopes_by_root: <_>::default(),
|
||||
penalties: <_>::default(),
|
||||
seen_lookups: <_>::default(),
|
||||
requests: <_>::default(),
|
||||
@@ -319,6 +340,7 @@ impl TestRig {
|
||||
Self::new(TestRigConfig {
|
||||
fulu_test_type: FuluTestType::WeFullnodeThemSupernode,
|
||||
node_custody_type_override: None,
|
||||
validator_count_override: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -327,6 +349,7 @@ impl TestRig {
|
||||
Self::new(TestRigConfig {
|
||||
fulu_test_type: FuluTestType::WeFullnodeThemSupernode,
|
||||
node_custody_type_override: Some(node_custody_type),
|
||||
validator_count_override: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -429,9 +452,9 @@ impl TestRig {
|
||||
process_fn.await
|
||||
}
|
||||
}
|
||||
Work::RpcBlobs { process_fn } | Work::RpcCustodyColumn(process_fn) => {
|
||||
process_fn.await
|
||||
}
|
||||
Work::RpcBlobs { process_fn }
|
||||
| Work::RpcCustodyColumn(process_fn)
|
||||
| Work::RpcPayloadEnvelope { process_fn } => process_fn.await,
|
||||
Work::ChainSegment {
|
||||
process_fn,
|
||||
process_id: (chain_id, batch_epoch),
|
||||
@@ -671,6 +694,45 @@ impl TestRig {
|
||||
self.send_rpc_columns_response(req_id, peer_id, &columns);
|
||||
}
|
||||
|
||||
(RequestType::PayloadEnvelopesByRoot(req), AppRequestId::Sync(req_id)) => {
|
||||
if self.complete_strategy.return_no_data_n_times > 0 {
|
||||
self.complete_strategy.return_no_data_n_times -= 1;
|
||||
return self.send_rpc_envelopes_response(req_id, peer_id, &[]);
|
||||
}
|
||||
|
||||
if self.complete_strategy.return_wrong_envelopes_n_times > 0 {
|
||||
self.complete_strategy.return_wrong_envelopes_n_times -= 1;
|
||||
// Return any envelope that doesn't match the request, so the
|
||||
// request items layer raises `UnrequestedBlockRoot`.
|
||||
let requested = req
|
||||
.beacon_block_roots
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<HashSet<_>>();
|
||||
let wrong = self
|
||||
.network_envelopes_by_root
|
||||
.iter()
|
||||
.find(|(root, _)| !requested.contains(*root))
|
||||
.map(|(_, envelope)| envelope.clone())
|
||||
.expect("test fixture must produce at least one extra envelope");
|
||||
return self.send_rpc_envelopes_response(req_id, peer_id, &[wrong]);
|
||||
}
|
||||
|
||||
let envelopes = req
|
||||
.beacon_block_roots
|
||||
.iter()
|
||||
.map(|block_root| {
|
||||
self.network_envelopes_by_root
|
||||
.get(block_root)
|
||||
.unwrap_or_else(|| {
|
||||
panic!("Test consumer requested unknown envelope: {block_root:?}")
|
||||
})
|
||||
.clone()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
self.send_rpc_envelopes_response(req_id, peer_id, &envelopes);
|
||||
}
|
||||
|
||||
(RequestType::BlocksByRange(req), AppRequestId::Sync(req_id)) => {
|
||||
if self.complete_strategy.skip_by_range_routes {
|
||||
return;
|
||||
@@ -894,6 +956,36 @@ impl TestRig {
|
||||
});
|
||||
}
|
||||
|
||||
fn send_rpc_envelopes_response(
|
||||
&mut self,
|
||||
sync_request_id: SyncRequestId,
|
||||
peer_id: PeerId,
|
||||
envelopes: &[Arc<SignedExecutionPayloadEnvelope<E>>],
|
||||
) {
|
||||
let block_roots = envelopes
|
||||
.iter()
|
||||
.map(|e| e.beacon_block_root())
|
||||
.collect::<Vec<_>>();
|
||||
self.log(&format!(
|
||||
"Completing request {sync_request_id:?} to {peer_id} with envelopes for {block_roots:?}"
|
||||
));
|
||||
|
||||
for envelope in envelopes {
|
||||
self.push_sync_message(SyncMessage::RpcPayloadEnvelope {
|
||||
sync_request_id,
|
||||
peer_id,
|
||||
envelope: Some(envelope.clone()),
|
||||
seen_timestamp: D,
|
||||
});
|
||||
}
|
||||
self.push_sync_message(SyncMessage::RpcPayloadEnvelope {
|
||||
sync_request_id,
|
||||
peer_id,
|
||||
envelope: None,
|
||||
seen_timestamp: D,
|
||||
});
|
||||
}
|
||||
|
||||
fn send_rpc_columns_response(
|
||||
&mut self,
|
||||
sync_request_id: SyncRequestId,
|
||||
@@ -936,16 +1028,25 @@ impl TestRig {
|
||||
pub(super) async fn build_chain(&mut self, block_count: usize) -> Hash256 {
|
||||
let mut blocks = vec![];
|
||||
|
||||
// Initialise a new beacon chain
|
||||
let external_harness = BeaconChainHarness::<EphemeralHarnessType<E>>::builder(E)
|
||||
// Initialise a new beacon chain. Match the local harness's validator count and
|
||||
// balance hooks so post-Electra forks (where genesis-time committee selection is
|
||||
// balance-weighted) can initialise.
|
||||
let validator_count = self.harness.validator_keypairs.len();
|
||||
let mut builder = BeaconChainHarness::<EphemeralHarnessType<E>>::builder(E)
|
||||
.spec(self.harness.spec.clone())
|
||||
.deterministic_keypairs(1)
|
||||
.deterministic_keypairs(validator_count)
|
||||
.fresh_ephemeral_store()
|
||||
.mock_execution_layer()
|
||||
.testing_slot_clock(self.harness.chain.slot_clock.clone())
|
||||
// Make the external harness a supernode so all columns are available
|
||||
.node_custody_type(NodeCustodyType::Supernode)
|
||||
.build();
|
||||
.node_custody_type(NodeCustodyType::Supernode);
|
||||
if self.harness.spec.electra_fork_epoch == Some(types::Epoch::new(0)) {
|
||||
let max_eb = self.harness.spec.max_effective_balance_electra;
|
||||
builder = builder.with_genesis_state_builder(move |b| {
|
||||
b.set_initial_balance_fn(Box::new(move |_| max_eb))
|
||||
});
|
||||
}
|
||||
let external_harness = builder.build();
|
||||
// Ensure all blocks have data. Otherwise, the triggers for unknown blob parent and unknown
|
||||
// data column parent fail.
|
||||
external_harness
|
||||
@@ -974,6 +1075,16 @@ impl TestRig {
|
||||
self.network_blocks_by_root
|
||||
.insert(block_root, block.clone());
|
||||
self.network_blocks_by_slot.insert(block_slot, block);
|
||||
// Post-Gloas, also capture the execution payload envelope so peers can serve it.
|
||||
if self.is_after_gloas()
|
||||
&& let Ok(Some(envelope)) = external_harness
|
||||
.chain
|
||||
.store
|
||||
.get_payload_envelope(&block_root)
|
||||
{
|
||||
self.network_envelopes_by_root
|
||||
.insert(block_root, Arc::new(envelope));
|
||||
}
|
||||
self.log(&format!(
|
||||
"Produced block {} index {i} in external harness",
|
||||
block_slot,
|
||||
@@ -1002,6 +1113,21 @@ impl TestRig {
|
||||
self.re_insert_block(Arc::new(block), blobs, columns);
|
||||
}
|
||||
|
||||
/// Replace the cached envelope's signature for `block_root` with one signed by an
|
||||
/// unrelated key, so it fails verification against the proposer's pubkey.
|
||||
fn corrupt_envelope_signature_for(&mut self, block_root: Hash256) {
|
||||
let envelope = self
|
||||
.network_envelopes_by_root
|
||||
.get(&block_root)
|
||||
.expect("no envelope cached for block_root")
|
||||
.as_ref()
|
||||
.clone();
|
||||
let mut envelope = envelope;
|
||||
envelope.signature = self.valid_signature();
|
||||
self.network_envelopes_by_root
|
||||
.insert(block_root, Arc::new(envelope));
|
||||
}
|
||||
|
||||
fn valid_signature(&mut self) -> bls::Signature {
|
||||
let keypair = bls::Keypair::random();
|
||||
let msg = Hash256::random();
|
||||
@@ -1178,6 +1304,32 @@ impl TestRig {
|
||||
self.harness.chain.recompute_head_at_current_slot().await;
|
||||
}
|
||||
|
||||
/// Persist a Gloas execution payload envelope into the local chain and mark the
|
||||
/// block as "payload received" in fork choice. Mimics the side-effects of the
|
||||
/// gossip-import path, including the `GossipEnvelopeImported` sync notification.
|
||||
/// The caller is responsible for ensuring the corresponding beacon block is
|
||||
/// already imported.
|
||||
async fn import_envelope_for_block_root(&mut self, block_root: Hash256) {
|
||||
let envelope = self
|
||||
.network_envelopes_by_root
|
||||
.get(&block_root)
|
||||
.unwrap_or_else(|| panic!("no envelope cached for {block_root:?}"))
|
||||
.as_ref()
|
||||
.clone();
|
||||
self.harness
|
||||
.chain
|
||||
.store
|
||||
.put_payload_envelope(&block_root, &envelope)
|
||||
.expect("should store envelope");
|
||||
self.harness
|
||||
.chain
|
||||
.canonical_head
|
||||
.fork_choice_write_lock()
|
||||
.on_valid_payload_envelope_received(block_root)
|
||||
.expect("should update fork choice with envelope");
|
||||
self.push_sync_message(SyncMessage::GossipEnvelopeImported { block_root });
|
||||
}
|
||||
|
||||
/// Import a block directly into the chain without going through lookup sync
|
||||
async fn import_block_by_root(&mut self, block_root: Hash256) {
|
||||
let range_sync_block = self
|
||||
@@ -1444,6 +1596,7 @@ impl TestRig {
|
||||
Self::new(TestRigConfig {
|
||||
fulu_test_type,
|
||||
node_custody_type_override: None,
|
||||
validator_count_override: None,
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -1460,6 +1613,22 @@ impl TestRig {
|
||||
self.fork_name.fulu_enabled()
|
||||
}
|
||||
|
||||
pub fn is_after_gloas(&self) -> bool {
|
||||
self.fork_name.gloas_enabled()
|
||||
}
|
||||
|
||||
fn new_after_gloas() -> Option<Self> {
|
||||
// Gloas requires more than 1 validator to initialise the genesis state
|
||||
// (committee/sampling computations fail with `InvalidIndicesCount`).
|
||||
genesis_fork().gloas_enabled().then(|| {
|
||||
Self::new(TestRigConfig {
|
||||
fulu_test_type: FuluTestType::WeFullnodeThemSupernode,
|
||||
node_custody_type_override: None,
|
||||
validator_count_override: Some(1024),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
fn trigger_unknown_parent_block(&mut self, peer_id: PeerId, block: Arc<SignedBeaconBlock<E>>) {
|
||||
let block_root = block.canonical_root();
|
||||
self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root))
|
||||
@@ -1483,6 +1652,18 @@ impl TestRig {
|
||||
));
|
||||
}
|
||||
|
||||
/// Trigger an envelope-unknown lookup for the last block in the chain. Caller is
|
||||
/// expected to have already imported the parent block (via `import_blocks_up_to_slot`)
|
||||
/// without registering its envelope.
|
||||
fn trigger_with_last_unknown_parent_envelope(&mut self) {
|
||||
let peer_id = self.new_connected_supernode_peer();
|
||||
let last_block = self.get_last_block().block_cloned();
|
||||
let block_root = last_block.canonical_root();
|
||||
self.send_sync_message(SyncMessage::UnknownParentEnvelope(
|
||||
peer_id, last_block, block_root,
|
||||
));
|
||||
}
|
||||
|
||||
fn rand_block(&mut self) -> SignedBeaconBlock<E> {
|
||||
self.rand_block_and_blobs(NumBlobs::None).0
|
||||
}
|
||||
@@ -2639,3 +2820,172 @@ async fn crypto_on_fail_with_bad_column_kzg_proof() {
|
||||
r.assert_penalties_of_type("lookup_custody_column_processing_failure");
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Gloas: parent envelope unknown lookup
|
||||
// ---------------------------------------------------------------------------
|
||||
//
|
||||
// These tests exercise the lookup-sync state machine introduced in PR #9039:
|
||||
// when a gossip block's parent execution payload envelope is missing,
|
||||
// `SyncManager` is expected to create two single-block lookups — an envelope-only
|
||||
// lookup for the parent block_root and a "child" lookup that holds the gossip
|
||||
// block and waits on `AwaitingParent::Envelope(parent_root)`. The envelope-only
|
||||
// lookup issues a `PayloadEnvelopesByRoot` RPC; on completion it unblocks the
|
||||
// child via `continue_envelope_child_lookups`.
|
||||
//
|
||||
// The tests below cover lookup creation, RPC routing, and drop-cascade
|
||||
// behaviour. The end-to-end happy path is gated on
|
||||
// `process_execution_payload_envelope` supporting `AvailabilityPending` (today
|
||||
// it returns `InternalError("Pending payload envelope not yet implemented")`),
|
||||
// which is tracked separately. See `process_rpc_envelope` in `sync_methods.rs`.
|
||||
|
||||
/// Builds a 2-block gloas chain in the external harness and locally imports block 1
|
||||
/// (parent) WITHOUT registering its envelope, leaving `is_payload_received(parent_root)`
|
||||
/// false — the precondition for `BlockError::ParentEnvelopeUnknown`.
|
||||
async fn setup_unknown_parent_envelope_scenario() -> Option<TestRig> {
|
||||
let mut r = TestRig::new_after_gloas()?;
|
||||
r.build_chain(2).await;
|
||||
r.import_blocks_up_to_slot(1).await;
|
||||
Some(r)
|
||||
}
|
||||
|
||||
fn payload_envelope_request_count(rig: &TestRig) -> usize {
|
||||
rig.requests
|
||||
.iter()
|
||||
.filter(|(request, _)| matches!(request, RequestType::PayloadEnvelopesByRoot(_)))
|
||||
.count()
|
||||
}
|
||||
|
||||
/// Triggering `UnknownParentEnvelope` creates exactly two lookups: an envelope-only
|
||||
/// lookup for the parent and a child lookup for the gossip block awaiting that envelope.
|
||||
#[tokio::test]
|
||||
async fn unknown_parent_envelope_creates_two_lookups() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
return;
|
||||
};
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
r.assert_single_lookups_count(2);
|
||||
}
|
||||
|
||||
/// Repeated `UnknownParentEnvelope` triggers for the same parent must not spawn extra
|
||||
/// lookups (peers are merged into the existing envelope lookup).
|
||||
#[tokio::test]
|
||||
async fn happy_path_unknown_parent_envelope_multiple_triggers() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
return;
|
||||
};
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
r.assert_single_lookups_count(2);
|
||||
}
|
||||
|
||||
/// The envelope-only lookup must dispatch a `PayloadEnvelopesByRoot` RPC for the
|
||||
/// parent block_root.
|
||||
#[tokio::test]
|
||||
async fn envelope_lookup_issues_by_root_rpc() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
return;
|
||||
};
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
r.simulate(SimulateConfig::new()).await;
|
||||
assert_eq!(
|
||||
payload_envelope_request_count(&r),
|
||||
1,
|
||||
"expected exactly one PayloadEnvelopesByRoot request"
|
||||
);
|
||||
}
|
||||
|
||||
/// One transient RPC error on the envelope request → lookup retries with the same peer
|
||||
/// and completes successfully. Mirrors the `bad_peer_rpc_failure` shape used for blocks.
|
||||
#[tokio::test]
|
||||
async fn bad_peer_envelope_rpc_failure() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
return;
|
||||
};
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
r.simulate(SimulateConfig::new().return_rpc_error(RPCError::IoError("test".into())))
|
||||
.await;
|
||||
r.assert_successful_lookup_sync();
|
||||
r.assert_head_slot(2);
|
||||
}
|
||||
|
||||
/// Peer responds once with an envelope for a different block_root than requested.
|
||||
/// The request-items layer raises `UnrequestedBlockRoot`, the peer is penalised, and
|
||||
/// the lookup retries successfully on the next request.
|
||||
#[tokio::test]
|
||||
async fn bad_peer_wrong_envelope_response() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
return;
|
||||
};
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
r.simulate(SimulateConfig::new().return_wrong_envelope_once())
|
||||
.await;
|
||||
r.assert_penalties_of_type("UnrequestedBlockRoot");
|
||||
r.assert_successful_lookup_sync();
|
||||
r.assert_head_slot(2);
|
||||
}
|
||||
|
||||
/// Trigger `UnknownParentEnvelope` when the parent's payload envelope is already
|
||||
/// in fork choice. Sync should treat the trigger as a no-op and create no lookups.
|
||||
#[tokio::test]
|
||||
async fn envelope_already_received_skips_lookup() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
return;
|
||||
};
|
||||
let parent_root = r.get_last_block().block_cloned().parent_root();
|
||||
r.import_envelope_for_block_root(parent_root).await;
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
r.assert_single_lookups_count(0);
|
||||
}
|
||||
|
||||
/// End-to-end: an envelope-only RPC lookup completes, the cached child block is
|
||||
/// processed, and the head advances to the gossip block.
|
||||
#[tokio::test]
|
||||
async fn happy_path_unknown_parent_envelope() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
return;
|
||||
};
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
r.simulate(SimulateConfig::happy_path()).await;
|
||||
r.assert_successful_lookup_sync();
|
||||
r.assert_head_slot(2);
|
||||
r.assert_no_penalties();
|
||||
}
|
||||
|
||||
/// While an envelope-only RPC lookup is pending, the same envelope is imported
|
||||
/// via the gossip path. The child lookup should still unblock and import.
|
||||
#[tokio::test]
|
||||
async fn happy_path_unknown_parent_envelope_via_gossip() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
return;
|
||||
};
|
||||
let parent_root = r.get_last_block().block_cloned().parent_root();
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
// Import the envelope via the local gossip path before any RPC response arrives.
|
||||
r.import_envelope_for_block_root(parent_root).await;
|
||||
r.simulate(SimulateConfig::happy_path()).await;
|
||||
r.assert_successful_lookup_sync();
|
||||
r.assert_head_slot(2);
|
||||
}
|
||||
|
||||
/// Peer returns the requested envelope but with a corrupted signature. Gossip
|
||||
/// verification rejects it; the lookup retries (single peer → exhaust → drop)
|
||||
/// and reports `lookup_envelope_processing_failure` against the peer.
|
||||
#[tokio::test]
|
||||
async fn crypto_on_fail_with_bad_envelope_signature() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
return;
|
||||
};
|
||||
let parent_root = r.get_last_block().block_cloned().parent_root();
|
||||
r.corrupt_envelope_signature_for(parent_root);
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
r.simulate(SimulateConfig::happy_path()).await;
|
||||
if cfg!(feature = "fake_crypto") {
|
||||
// Under fake_crypto, signature checks are no-ops, so a "corrupted"
|
||||
// signature still passes. Skip — analogous to the existing
|
||||
// `crypto_on_fail_with_invalid_block_signature` test.
|
||||
return;
|
||||
}
|
||||
r.assert_failed_lookup_sync();
|
||||
r.assert_penalties_of_type("lookup_envelope_processing_failure");
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ use tokio::sync::mpsc;
|
||||
use tracing_subscriber::fmt::MakeWriter;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
use types::{ForkName, Hash256, MinimalEthSpec as E, Slot};
|
||||
use types::{ForkName, Hash256, MinimalEthSpec as E, SignedExecutionPayloadEnvelope, Slot};
|
||||
|
||||
mod lookups;
|
||||
mod range;
|
||||
@@ -79,6 +79,8 @@ struct TestRig {
|
||||
/// Blocks that will be used in the test but may not be known to `harness` yet.
|
||||
network_blocks_by_root: HashMap<Hash256, RangeSyncBlock<E>>,
|
||||
network_blocks_by_slot: HashMap<Slot, RangeSyncBlock<E>>,
|
||||
/// Execution payload envelopes (Gloas) keyed by beacon block root, available to peers.
|
||||
network_envelopes_by_root: HashMap<Hash256, Arc<SignedExecutionPayloadEnvelope<E>>>,
|
||||
penalties: Vec<ReportedPenalty>,
|
||||
/// All seen lookups through the test run
|
||||
seen_lookups: HashMap<Id, SeenLookup>,
|
||||
|
||||
Reference in New Issue
Block a user