add payload stuff

This commit is contained in:
Eitan Seri- Levi
2026-03-16 16:34:52 -07:00
parent 55fa3b321f
commit 9cfe66233f
6 changed files with 151 additions and 66 deletions

View File

@@ -3799,14 +3799,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
AvailabilityOutcome::Payload(availability) => match availability {
PayloadAvailability::Available(available_payload_data) => {
PayloadAvailability::Available(available_envelope) => {
// TODO(gloas) execution publish_fn
// publish_fn()?;
// Payload data is fully available
let (block_root, data_columns) = *available_payload_data;
self.import_available_payload_data(block_root, data_columns)
// Payload envelope is fully available
let res = self
.import_available_execution_payload_envelope(available_envelope)
.await
.unwrap();
// TODO(gloas) unwrap
Ok(res)
}
PayloadAvailability::MissingComponents(block_root) => Ok(
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
@@ -3815,17 +3819,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
#[instrument(skip_all)]
pub async fn import_available_payload_data(
self: &Arc<Self>,
block_root: Hash256,
_data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// TODO(gloas) this is just a stub implementation
// this function should mark payload data as available somehow
Ok(AvailabilityProcessingStatus::Imported(block_root))
}
#[instrument(skip_all)]
pub async fn import_available_block(
self: &Arc<Self>,

View File

@@ -3,6 +3,7 @@ use crate::data_availability_checker_v2::pending_components_cache::{
};
use crate::data_availability_checker::AvailabilityCheckError;
use crate::payload_envelope_verification::AvailableExecutedEnvelope;
use crate::{BeaconChain, BeaconChainTypes, CustodyContext, metrics};
use kzg::Kzg;
use slot_clock::SlotClock;
@@ -17,6 +18,7 @@ use types::{
SignedExecutionPayloadBid, Slot,
};
mod payload_envelope_cache;
mod pending_components_cache;
use crate::data_column_verification::{
@@ -45,7 +47,7 @@ pub type AvailableData<E> = (Hash256, DataColumnSidecarList<E>);
/// Indicates if the payloads data is fully `Available` or if we need more columns.
pub enum Availability<E: EthSpec> {
MissingComponents(Hash256),
Available(Box<AvailableData<E>>),
Available(Box<AvailableExecutedEnvelope<E>>),
}
impl<E: EthSpec> Debug for Availability<E> {
@@ -54,7 +56,8 @@ impl<E: EthSpec> Debug for Availability<E> {
Self::MissingComponents(block_root) => {
write!(f, "MissingComponents({})", block_root)
}
Self::Available(data) => write!(f, "Available({}, {} columns)", data.0, data.1.len()),
// TODO(gloas) fix success case
Self::Available(data) => todo!(),
}
}
}

View File

@@ -3,17 +3,26 @@ use crate::CustodyContext;
use crate::data_availability_checker::AvailabilityCheckError;
use crate::data_availability_checker_v2::Availability;
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
use crate::payload_envelope_verification::AvailabilityPendingExecutedEnvelope;
use crate::payload_envelope_verification::AvailableEnvelope;
use crate::payload_envelope_verification::AvailableExecutedEnvelope;
use lru::LruCache;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::cmp::Ordering;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tracing::{Span, debug, debug_span};
use types::BlockImportSource;
use types::{
ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256,
SignedExecutionPayloadBid,
SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope,
};
pub enum CachedPayloadEnvelope<E: EthSpec> {
PreExecution(Arc<SignedExecutionPayloadEnvelope<E>>, BlockImportSource),
Executed(Box<AvailabilityPendingExecutedEnvelope<E>>),
}
/// This represents the components of a payload pending data availability.
///
/// The columns are all gossip and kzg verified.
@@ -24,9 +33,12 @@ pub struct PendingComponents<E: EthSpec> {
pub block_root: Hash256,
/// The execution payload bid containing blob_kzg_commitments.
pub bid: Option<Arc<SignedExecutionPayloadBid<E>>>,
/// a cached pre or post executed payload envelope
pub envelope: Option<CachedPayloadEnvelope<E>>,
pub verified_data_columns: Vec<KzgVerifiedCustodyDataColumn<E>>,
pub reconstruction_started: bool,
span: Span,
spec: Arc<ChainSpec>,
}
impl<E: EthSpec> PendingComponents<E> {
@@ -68,6 +80,19 @@ impl<E: EthSpec> PendingComponents<E> {
self.bid = Some(bid);
}
pub fn insert_pending_executed_envelope(
&mut self,
envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
import_source: BlockImportSource,
) {
self.envelope = Some(CachedPayloadEnvelope::PreExecution(envelope, import_source))
}
/// Inserts an executed payload envelope into the cache.
pub fn insert_executed_envelope(&mut self, envelope: AvailabilityPendingExecutedEnvelope<E>) {
self.envelope = Some(CachedPayloadEnvelope::Executed(Box::new(envelope)))
}
/// Returns the number of blobs expected by reading the bid's kzg commitments.
/// Returns an error if the bid is not cached. This function should only be called
/// after ensuring that the bid has been cached.
@@ -80,66 +105,92 @@ impl<E: EthSpec> PendingComponents<E> {
Ok(bid.message.blob_kzg_commitments.len())
}
/// Returns `Some` if the bid and all required data columns have been received.
/// Returns `Some` if the envelope and all required data columns have been received.
pub fn make_available(
&self,
num_expected_columns: usize,
) -> Result<Option<DataColumnSidecarList<E>>, AvailabilityCheckError> {
// Check if we have a bid - if not, still waiting
) -> Result<Option<AvailableExecutedEnvelope<E>>, AvailabilityCheckError> {
// If no bid has been received, we can start verifying the columns
if self.bid.is_none() {
return Ok(None);
}
// Check if the payload has been received and executed
let Some(CachedPayloadEnvelope::Executed(envelope)) = self.envelope.as_ref() else {
return Ok(None);
};
let AvailabilityPendingExecutedEnvelope {
envelope,
import_data,
payload_verification_outcome,
} = envelope.as_ref();
// Get the number of blobs expected from the bid
let num_expected_blobs = self.num_blobs_expected()?;
if num_expected_blobs == 0 {
// No blobs expected, data is available (empty)
let columns = if num_expected_blobs == 0 {
self.span.in_scope(|| {
debug!("Bid has no blobs, data is available");
});
return Ok(Some(vec![]));
}
vec![]
} else {
let num_received_columns = self.verified_data_columns.len();
match num_received_columns.cmp(&num_expected_columns) {
Ordering::Greater => {
// Should never happen
return Err(AvailabilityCheckError::Unexpected(format!(
"too many columns got {num_received_columns} expected {num_expected_columns}"
)));
}
Ordering::Equal => {
// We have enough columns
let data_columns = self
.verified_data_columns
.iter()
.map(|d| d.clone().into_inner())
.collect::<Vec<_>>();
let num_received_columns = self.verified_data_columns.len();
match num_received_columns.cmp(&num_expected_columns) {
Ordering::Greater => {
// Should never happen
Err(AvailabilityCheckError::Unexpected(format!(
"too many columns got {num_received_columns} expected {num_expected_columns}"
)))
}
Ordering::Equal => {
// We have enough columns
let data_columns = self
.verified_data_columns
.iter()
.map(|d| d.clone().into_inner())
.collect::<Vec<_>>();
self.span.in_scope(|| {
debug!("All data columns received, data is available");
});
self.span.in_scope(|| {
debug!("All data columns received, data is available");
});
data_columns
}
Ordering::Less => {
// Not enough data columns received yet
return Ok(None);
}
}
};
Ok(Some(data_columns))
}
Ordering::Less => {
// Not enough data columns received yet
Ok(None)
}
}
let available_envelope = AvailableEnvelope {
execution_block_hash: envelope.block_hash(),
envelope: envelope.clone(),
columns,
columns_available_timestamp: None,
spec: self.spec.clone(),
};
Ok(Some(AvailableExecutedEnvelope {
envelope: available_envelope,
import_data: import_data.clone(),
payload_verification_outcome: payload_verification_outcome.clone(),
}))
}
/// Returns an empty `PendingComponents` object with the given block root.
pub fn empty(block_root: Hash256) -> Self {
pub fn empty(block_root: Hash256, spec: Arc<ChainSpec>) -> Self {
let span = debug_span!(parent: None, "lh_pending_components", %block_root);
let _guard = span.clone().entered();
Self {
block_root,
bid: None,
envelope: None,
verified_data_columns: vec![],
reconstruction_started: false,
span,
spec,
}
}
@@ -294,7 +345,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pending_components: MappedRwLockReadGuard<'_, PendingComponents<T::EthSpec>>,
num_expected_columns: usize,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
if let Some(columns) = pending_components.make_available(num_expected_columns)? {
if let Some(available_envelope) = pending_components.make_available(num_expected_columns)? {
// Explicitly drop read lock before acquiring write lock
drop(pending_components);
if let Some(components) = self.critical.write().get_mut(&block_root) {
@@ -308,7 +359,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
// imported, but re-inserted immediately, causing partial pending components to be
// stored and served to peers.
// Components are only removed via LRU eviction as finality advances.
Ok(Availability::Available(Box::new((block_root, columns))))
Ok(Availability::Available(Box::new(available_envelope)))
} else {
Ok(Availability::MissingComponents(block_root))
}
@@ -330,8 +381,9 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
let mut write_lock = self.critical.write();
{
let pending_components =
write_lock.get_or_insert_mut(block_root, || PendingComponents::empty(block_root));
let pending_components = write_lock.get_or_insert_mut(block_root, || {
PendingComponents::empty(block_root, self.spec.clone())
});
update_fn(pending_components)?
}
@@ -433,6 +485,8 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
#[cfg(test)]
mod pending_components_tests {
use crate::test_utils::test_spec;
use super::*;
use types::MinimalEthSpec;
@@ -440,8 +494,9 @@ mod pending_components_tests {
#[test]
fn test_empty_pending_components() {
let spec = Arc::new(test_spec::<E>());
let block_root = Hash256::random();
let components = PendingComponents::<E>::empty(block_root);
let components = PendingComponents::<E>::empty(block_root, spec);
assert_eq!(components.block_root, block_root);
assert!(components.bid.is_none());
@@ -452,8 +507,9 @@ mod pending_components_tests {
#[test]
fn test_get_cached_data_columns_indices_empty() {
let spec = Arc::new(test_spec::<E>());
let block_root = Hash256::random();
let components = PendingComponents::<E>::empty(block_root);
let components = PendingComponents::<E>::empty(block_root, spec);
let indices = components.get_cached_data_columns_indices();
assert!(indices.is_empty());
@@ -461,8 +517,9 @@ mod pending_components_tests {
#[test]
fn test_status_str_no_bid() {
let spec = Arc::new(test_spec::<E>());
let block_root = Hash256::random();
let components = PendingComponents::<E>::empty(block_root);
let components = PendingComponents::<E>::empty(block_root, spec);
let status = components.status_str(10);
assert_eq!(status, "data_columns 0/10");
@@ -470,8 +527,9 @@ mod pending_components_tests {
#[test]
fn test_num_blobs_expected_no_bid() {
let spec = Arc::new(test_spec::<E>());
let block_root = Hash256::random();
let components = PendingComponents::<E>::empty(block_root);
let components = PendingComponents::<E>::empty(block_root, spec);
let result = components.num_blobs_expected();
assert!(result.is_err());
@@ -484,8 +542,9 @@ mod pending_components_tests {
#[test]
fn test_make_available_no_bid_returns_none() {
let spec = Arc::new(test_spec::<E>());
let block_root = Hash256::random();
let components = PendingComponents::<E>::empty(block_root);
let components = PendingComponents::<E>::empty(block_root, spec);
// Without a bid, make_available should return Ok(None)
let result = components.make_available(10);

View File

@@ -64,7 +64,9 @@ impl<E: EthSpec> AvailabilityOutcome<E> {
Self::Block(BlockAvailability::Available(block)) => block.import_data.block_root,
Self::Block(BlockAvailability::MissingComponents(root)) => *root,
// For payload availability, the first element of the tuple is the block root
Self::Payload(PayloadAvailability::Available(available_data)) => available_data.0,
Self::Payload(PayloadAvailability::Available(available_data)) => {
available_data.envelope.message().beacon_block_root
}
Self::Payload(PayloadAvailability::MissingComponents(root)) => *root,
}
}

View File

@@ -41,7 +41,7 @@ mod payload_notifier;
pub use execution_pending_envelope::ExecutionPendingEnvelope;
#[derive(PartialEq)]
#[derive(Clone, Debug, PartialEq)]
pub struct EnvelopeImportData<E: EthSpec> {
pub block_root: Hash256,
pub post_state: Box<BeaconState<E>>,
@@ -50,11 +50,11 @@ pub struct EnvelopeImportData<E: EthSpec> {
#[derive(Debug)]
#[allow(dead_code)]
pub struct AvailableEnvelope<E: EthSpec> {
execution_block_hash: ExecutionBlockHash,
envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
columns: DataColumnSidecarList<E>,
pub execution_block_hash: ExecutionBlockHash,
pub envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
pub columns: DataColumnSidecarList<E>,
/// Timestamp at which this envelope first became available (UNIX timestamp, time since 1970).
columns_available_timestamp: Option<std::time::Duration>,
pub columns_available_timestamp: Option<std::time::Duration>,
pub spec: Arc<ChainSpec>,
}
@@ -132,6 +132,33 @@ impl<E: EthSpec> ExecutedEnvelope<E> {
}
}
/// A payload ernvelope that has completed all envelope procesing checks, verification
/// by an EL client but does not have all requisite columns to get imported into
/// fork choice.
pub struct AvailabilityPendingExecutedEnvelope<E: EthSpec> {
pub envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
pub import_data: EnvelopeImportData<E>,
pub payload_verification_outcome: PayloadVerificationOutcome,
}
impl<E: EthSpec> AvailabilityPendingExecutedEnvelope<E> {
pub fn new(
envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
import_data: EnvelopeImportData<E>,
payload_verification_outcome: PayloadVerificationOutcome,
) -> Self {
Self {
envelope,
import_data,
payload_verification_outcome,
}
}
pub fn as_envelope(&self) -> &SignedExecutionPayloadEnvelope<E> {
&self.envelope
}
}
/// A payload envelope that has completed all payload processing checks including verification
/// by an EL client **and** has all requisite blob data to be imported into fork choice.
pub struct AvailableExecutedEnvelope<E: EthSpec> {