merge conflicts

This commit is contained in:
Eitan Seri- Levi
2026-04-03 09:26:40 -07:00
144 changed files with 8530 additions and 3436 deletions

View File

@@ -59,6 +59,8 @@ use crate::observed_block_producers::ObservedBlockProducers;
use crate::observed_data_sidecars::ObservedDataSidecars;
use crate::observed_operations::{ObservationOutcome, ObservedOperations};
use crate::observed_slashable::ObservedSlashable;
#[cfg(not(test))]
use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream};
use crate::pending_payload_envelopes::PendingPayloadEnvelopes;
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::persisted_custody::persist_custody_context;
@@ -133,7 +135,7 @@ use store::{
};
use task_executor::{RayonPoolType, ShutdownReason, TaskExecutor};
use tokio_stream::Stream;
use tracing::{Span, debug, debug_span, error, info, info_span, instrument, trace, warn};
use tracing::{debug, debug_span, error, info, info_span, instrument, trace, warn};
use tree_hash::TreeHash;
use types::data::{ColumnIndex, FixedBlobSidecarList};
use types::execution::BlockProductionVersion;
@@ -1140,6 +1142,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_blobs(block_root), Ok)
}
#[cfg(not(test))]
#[allow(clippy::type_complexity)]
pub fn get_payload_envelopes(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
request_source: EnvelopeRequestSource,
) -> impl Stream<
Item = (
Hash256,
Arc<Result<Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>, Error>>,
),
> {
launch_payload_envelope_stream(self.clone(), block_roots, request_source)
}
pub fn get_data_columns_checking_all_caches(
&self,
block_root: Hash256,
@@ -1457,7 +1474,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.proto_array()
.heads_descended_from_finalization::<T::EthSpec>(fork_choice.finalized_checkpoint())
.iter()
.map(|node| (node.root, node.slot))
.map(|node| (node.root(), node.slot()))
.collect()
}
@@ -2287,6 +2304,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.slot()?,
verified.indexed_attestation().to_ref(),
AttestationFromBlock::False,
&self.spec,
)
.map_err(Into::into)
}
@@ -2750,6 +2768,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// or already-known).
///
/// This method is potentially long-running and should not run on the core executor.
#[instrument(skip_all, level = "debug")]
pub fn filter_chain_segment(
self: &Arc<Self>,
chain_segment: Vec<RangeSyncBlock<T::EthSpec>>,
@@ -2877,12 +2896,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Filter uninteresting blocks from the chain segment in a blocking task.
let chain = self.clone();
let filter_chain_segment = debug_span!("filter_chain_segment");
let filtered_chain_segment_future = self.spawn_blocking_handle(
move || {
let _guard = filter_chain_segment.enter();
chain.filter_chain_segment(chain_segment)
},
move || chain.filter_chain_segment(chain_segment),
"filter_chain_segment",
);
let mut filtered_chain_segment = match filtered_chain_segment_future.await {
@@ -2913,12 +2928,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
std::mem::swap(&mut blocks, &mut filtered_chain_segment);
let chain = self.clone();
let current_span = Span::current();
let signature_verification_future = self.spawn_blocking_handle(
move || {
let _guard = current_span.enter();
signature_verify_chain_segment(blocks, &chain)
},
move || signature_verify_chain_segment(blocks, &chain),
"signature_verify_chain_segment",
);
@@ -3008,12 +3019,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block: Arc<SignedBeaconBlock<T::EthSpec>>,
) -> Result<GossipVerifiedBlock<T>, BlockError> {
let chain = self.clone();
let span = Span::current();
self.task_executor
.clone()
.spawn_blocking_handle(
move || {
let _guard = span.enter();
let slot = block.slot();
let graffiti_string = block.message().body().graffiti().as_utf8_lossy();
@@ -3326,11 +3335,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let data_availability_checker = self.data_availability_checker.clone();
let current_span = Span::current();
let result = self
.task_executor
.spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || {
let _guard = current_span.enter();
data_availability_checker.reconstruct_data_columns(&block_root, slot)
})
.await
@@ -3852,7 +3859,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
consensus_context,
} = import_data;
// Record the time at which this block's blobs became available.
// Record the time at which this block's blobs/data columns became available.
if let Some(blobs_available) = block.blobs_available_timestamp() {
self.block_times_cache.write().set_time_blob_observed(
block_root,
@@ -3861,16 +3868,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}
// TODO(das) record custody column available timestamp
let block_root = {
// Capture the current span before moving into the blocking task
let current_span = tracing::Span::current();
let chain = self.clone();
self.spawn_blocking_handle(
move || {
// Enter the captured span in the blocking thread
let _guard = current_span.enter();
chain.import_block(
block,
block_root,
@@ -4002,7 +4003,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let fork_choice_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE);
match fork_choice.get_head(current_slot, &self.spec) {
// This block became the head, add it to the early attester cache.
Ok(new_head_root) if new_head_root == block_root => {
Ok((new_head_root, _)) if new_head_root == block_root => {
if let Some(proto_block) = fork_choice.get_block(&block_root) {
let new_head_is_optimistic =
proto_block.execution_status.is_optimistic_or_invalid();
@@ -4581,15 +4582,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
//
// Load the parent state from disk.
let chain = self.clone();
let span = Span::current();
let (state, state_root_opt) = self
.task_executor
.spawn_blocking_handle(
move || {
let _guard =
debug_span!(parent: span, "load_state_for_block_production").entered();
chain.load_state_for_block_production(slot)
},
move || chain.load_state_for_block_production(slot),
"load_state_for_block_production",
)
.ok_or(BlockProductionError::ShuttingDown)?
@@ -4807,6 +4803,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}
// TODO(gloas): wrong for Gloas, needs an update
pub fn overridden_forkchoice_update_params_or_failure_reason(
&self,
canonical_forkchoice_params: &ForkchoiceUpdateParameters,
@@ -4841,7 +4838,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// The slot of our potential re-org block is always 1 greater than the head block because we
// only attempt single-slot re-orgs.
let head_slot = info.head_node.slot;
let head_slot = info.head_node.slot();
let re_org_block_slot = head_slot + 1;
let fork_choice_slot = info.current_slot;
@@ -4876,9 +4873,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_name_at_slot::<T::EthSpec>(re_org_block_slot)
.fulu_enabled()
{
info.head_node.current_epoch_shuffling_id
info.head_node.current_epoch_shuffling_id()
} else {
info.head_node.next_epoch_shuffling_id
info.head_node.next_epoch_shuffling_id()
}
.shuffling_decision_block;
let proposer_index = self
@@ -4904,13 +4901,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(Box::new(DoNotReOrg::NotProposing.into()));
}
// If the current slot is already equal to the proposal slot (or we are in the tail end of
// the prior slot), then check the actual weight of the head against the head re-org threshold
// and the actual weight of the parent against the parent re-org threshold.
// TODO(gloas): reorg weight logic needs updating for Gloas. For now use
// total weight which is correct for pre-Gloas and conservative for post-Gloas.
let head_weight = info.head_node.weight();
let parent_weight = info.parent_node.weight();
let (head_weak, parent_strong) = if fork_choice_slot == re_org_block_slot {
(
info.head_node.weight < info.re_org_head_weight_threshold,
info.parent_node.weight > info.re_org_parent_weight_threshold,
head_weight < info.re_org_head_weight_threshold,
parent_weight > info.re_org_parent_weight_threshold,
)
} else {
(true, true)
@@ -4918,7 +4917,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if !head_weak {
return Err(Box::new(
DoNotReOrg::HeadNotWeak {
head_weight: info.head_node.weight,
head_weight,
re_org_head_weight_threshold: info.re_org_head_weight_threshold,
}
.into(),
@@ -4927,7 +4926,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if !parent_strong {
return Err(Box::new(
DoNotReOrg::ParentNotStrong {
parent_weight: info.parent_node.weight,
parent_weight,
re_org_parent_weight_threshold: info.re_org_parent_weight_threshold,
}
.into(),
@@ -4945,9 +4944,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(Box::new(DoNotReOrg::HeadNotLate.into()));
}
let parent_head_hash = info.parent_node.execution_status.block_hash();
// TODO(gloas): V29 nodes don't carry execution_status, so this returns
// None for post-Gloas re-orgs. Need to source the EL block hash from
// the bid's block_hash instead. Re-org is disabled for Gloas for now.
let parent_head_hash = info
.parent_node
.execution_status()
.ok()
.and_then(|execution_status| execution_status.block_hash());
let forkchoice_update_params = ForkchoiceUpdateParameters {
head_root: info.parent_node.root,
head_root: info.parent_node.root(),
head_hash: parent_head_hash,
justified_hash: canonical_forkchoice_params.justified_hash,
finalized_hash: canonical_forkchoice_params.finalized_hash,
@@ -4955,7 +4961,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
debug!(
canonical_head = ?head_block_root,
?info.parent_node.root,
parent_root = ?info.parent_node.root(),
slot = %fork_choice_slot,
"Fork choice update overridden"
);
@@ -5013,13 +5019,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.graffiti_calculator
.get_graffiti(graffiti_settings)
.await;
let span = Span::current();
let mut partial_beacon_block = self
.task_executor
.spawn_blocking_handle(
move || {
let _guard =
debug_span!(parent: span, "produce_partial_beacon_block").entered();
chain.produce_partial_beacon_block(
state,
state_root_opt,
@@ -5055,14 +5058,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
match block_contents_type {
BlockProposalContentsType::Full(block_contents) => {
let chain = self.clone();
let span = Span::current();
let beacon_block_response = self
.task_executor
.spawn_blocking_handle(
move || {
let _guard =
debug_span!(parent: span, "complete_partial_beacon_block")
.entered();
chain.complete_partial_beacon_block(
partial_beacon_block,
Some(block_contents),
@@ -5079,14 +5078,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
BlockProposalContentsType::Blinded(block_contents) => {
let chain = self.clone();
let span = Span::current();
let beacon_block_response = self
.task_executor
.spawn_blocking_handle(
move || {
let _guard =
debug_span!(parent: span, "complete_partial_beacon_block")
.entered();
chain.complete_partial_beacon_block(
partial_beacon_block,
Some(block_contents),
@@ -5104,13 +5099,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
} else {
let chain = self.clone();
let span = Span::current();
let beacon_block_response = self
.task_executor
.spawn_blocking_handle(
move || {
let _guard =
debug_span!(parent: span, "complete_partial_beacon_block").entered();
chain.complete_partial_beacon_block(
partial_beacon_block,
None,
@@ -5128,6 +5120,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, level = "debug")]
fn produce_partial_beacon_block(
self: &Arc<Self>,
mut state: BeaconState<T::EthSpec>,
@@ -5372,6 +5365,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}
#[instrument(skip_all, level = "debug")]
fn complete_partial_beacon_block<Payload: AbstractExecPayload<T::EthSpec>>(
&self,
partial_beacon_block: PartialBeaconBlock<T::EthSpec>,
@@ -6757,6 +6751,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut prev_block_root = None;
let mut prev_beacon_state = None;
// Collect all blocks.
let mut blocks = vec![];
for res in self.forwards_iter_block_roots(from_slot)? {
let (beacon_block_root, _) = res?;
@@ -6772,16 +6769,42 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.ok_or_else(|| {
Error::DBInconsistent(format!("Missing block {}", beacon_block_root))
})?;
let beacon_state_root = beacon_block.state_root();
blocks.push((beacon_block_root, Arc::new(beacon_block)));
}
// Collect states, using the next blocks to determine if states are full (have Gloas
// payloads).
for (i, (block_root, block)) in blocks.iter().enumerate() {
let (opt_envelope, state_root) = if block.fork_name_unchecked().gloas_enabled() {
let opt_envelope = self.store.get_payload_envelope(block_root)?.map(Arc::new);
if let Some((_, next_block)) = blocks.get(i + 1) {
let block_hash = block.payload_bid_block_hash()?;
if next_block.is_parent_block_full(block_hash) {
let envelope = opt_envelope.ok_or_else(|| {
Error::DBInconsistent(format!("Missing envelope {block_root:?}"))
})?;
let state_root = envelope.message.state_root;
(Some(envelope), state_root)
} else {
(None, block.state_root())
}
} else {
// TODO(gloas): should use fork choice/cached head for last block in sequence
opt_envelope
.as_ref()
.map_or((None, block.state_root()), |envelope| {
(Some(envelope.clone()), envelope.message.state_root)
})
}
} else {
(None, block.state_root())
};
// This branch is reached from the HTTP API. We assume the user wants
// to cache states so that future calls are faster.
let mut beacon_state = self
.store
.get_state(&beacon_state_root, Some(beacon_block.slot()), true)?
.ok_or_else(|| {
Error::DBInconsistent(format!("Missing state {:?}", beacon_state_root))
})?;
.get_state(&state_root, Some(block.slot()), true)?
.ok_or_else(|| Error::DBInconsistent(format!("Missing state {:?}", state_root)))?;
// This beacon state might come from the freezer DB, which means it could have pending
// updates or lots of untethered memory. We rebase it on the previous state in order to
@@ -6794,12 +6817,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
prev_beacon_state = Some(beacon_state.clone());
let snapshot = BeaconSnapshot {
beacon_block: Arc::new(beacon_block),
beacon_block_root,
beacon_block: block.clone(),
execution_envelope: opt_envelope,
beacon_block_root: *block_root,
beacon_state,
};
dump.push(snapshot);
}
Ok(dump)
}