Merge branch 'gloas-serve-envelope-rpc' into epbs-devnet-0

This commit is contained in:
Eitan Seri- Levi
2026-03-03 20:11:41 -08:00
41 changed files with 1638 additions and 44 deletions

View File

@@ -28,6 +28,7 @@ use crate::envelope_times_cache::EnvelopeTimesCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::events::ServerSentEventHandler;
use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_execution_payload};
use crate::execution_payload_envelope_streamer::PayloadEnvelopeStreamer;
use crate::fetch_blobs::EngineGetBlobsOutput;
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx};
use crate::graffiti_calculator::{GraffitiCalculator, GraffitiSettings};
@@ -664,7 +665,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.custody_context()
.as_ref()
.into();
debug!(?custody_context, "Persisting custody context to store");
// Pattern match to avoid accidentally missing fields and to ignore deprecated fields.
let CustodyContextSsz {
validator_custody_at_head,
epoch_validator_custody_requirements,
persisted_is_supernode: _,
} = &custody_context;
debug!(
validator_custody_at_head,
?epoch_validator_custody_requirements,
"Persisting custody context to store"
);
persist_custody_context::<T::EthSpec, T::HotStore, T::ColdStore>(
self.store.clone(),
@@ -1126,6 +1138,58 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_blobs(block_root), Ok)
}
/// Returns the execution payload envelopes at the given roots, if any.
///
/// Will also check any associated caches. The expected use for this function is *only* for returning blocks requested
/// from P2P peers.
///
/// ## Errors
///
/// May return a database error.
#[allow(clippy::type_complexity)]
pub fn get_payload_envelopes_checking_caches(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
) -> Result<
impl Stream<
Item = (
Hash256,
Arc<Result<Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>, Error>>,
),
>,
Error,
> {
Ok(PayloadEnvelopeStreamer::<T>::new(
self.execution_layer.clone(),
self.store.clone(),
self.task_executor.clone(),
CheckCaches::Yes,
)?
.launch_stream(block_roots))
}
#[allow(clippy::type_complexity)]
pub fn get_payload_envelopes(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
) -> Result<
impl Stream<
Item = (
Hash256,
Arc<Result<Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>, Error>>,
),
>,
Error,
> {
Ok(PayloadEnvelopeStreamer::<T>::new(
self.execution_layer.clone(),
self.store.clone(),
self.task_executor.clone(),
CheckCaches::No,
)?
.launch_stream(block_roots))
}
pub fn get_data_columns_checking_all_caches(
&self,
block_root: Hash256,

View File

@@ -1092,6 +1092,11 @@ where
let cgc_change_effective_slot =
cgc_changed.effective_epoch.start_slot(E::slots_per_epoch());
beacon_chain.update_data_column_custody_info(Some(cgc_change_effective_slot));
// Persist change to disk.
beacon_chain
.persist_custody_context()
.map_err(|e| format!("Failed writing updated CGC: {e:?}"))?;
}
info!(

View File

@@ -326,6 +326,15 @@ pub enum BlockProductionError {
GloasNotImplemented(String),
}
impl From<task_executor::SpawnBlockingError> for BeaconChainError {
fn from(e: task_executor::SpawnBlockingError) -> Self {
match e {
task_executor::SpawnBlockingError::RuntimeShutdown => BeaconChainError::RuntimeShutdown,
task_executor::SpawnBlockingError::JoinError(e) => BeaconChainError::TokioJoin(e),
}
}
}
easy_from_to!(BlockProcessingError, BlockProductionError);
easy_from_to!(BeaconStateError, BlockProductionError);
easy_from_to!(SlotProcessingError, BlockProductionError);

View File

@@ -0,0 +1,151 @@
use std::sync::Arc;
use bls::Hash256;
use execution_layer::ExecutionLayer;
use futures::Stream;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::debug;
use types::{EthSpec, SignedExecutionPayloadEnvelope};
use crate::{BeaconChainError, BeaconChainTypes, BeaconStore, beacon_block_streamer::CheckCaches};
type PayloadEnvelopeResult<E> =
Result<Option<Arc<SignedExecutionPayloadEnvelope<E>>>, BeaconChainError>;
pub struct PayloadEnvelopeStreamer<T: BeaconChainTypes> {
// TODO(gloas) remove _ when we use the execution layer
// to load payload envelopes
_execution_layer: ExecutionLayer<T::EthSpec>,
store: BeaconStore<T>,
task_executor: TaskExecutor,
_check_caches: CheckCaches,
}
// TODO(gloas) eventually we'll need to expand this to support loading blinded payload envelopes from the db
// and fetching the execution payload from the EL. See BlockStreamer impl as an example
impl<T: BeaconChainTypes> PayloadEnvelopeStreamer<T> {
pub fn new(
execution_layer_opt: Option<ExecutionLayer<T::EthSpec>>,
store: BeaconStore<T>,
task_executor: TaskExecutor,
check_caches: CheckCaches,
) -> Result<Arc<Self>, BeaconChainError> {
let execution_layer = execution_layer_opt
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)?
.clone();
Ok(Arc::new(Self {
_execution_layer: execution_layer,
store,
task_executor,
_check_caches: check_caches,
}))
}
// TODO(gloas) simply a stub impl for now. Should check some exec payload envelope cache
// and return the envelope if it exists in the cache
fn check_payload_envelope_cache(
&self,
_beacon_block_root: Hash256,
) -> Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>> {
// if self.check_caches == CheckCaches::Yes
None
}
async fn load_envelopes(
self: &Arc<Self>,
beacon_block_roots: &[Hash256],
) -> Result<Vec<(Hash256, PayloadEnvelopeResult<T::EthSpec>)>, BeaconChainError> {
let streamer = self.clone();
let roots = beacon_block_roots.to_vec();
// Loading from the DB is slow -> spawn a blocking task
self.task_executor
.spawn_blocking_and_await(
move || {
let mut results = Vec::new();
for root in roots {
if let Some(cached) = streamer.check_payload_envelope_cache(root) {
results.push((root, Ok(Some(cached))));
continue;
}
// TODO(gloas) we'll want to use the execution layer directly to call
// the engine api method eth_getBlockByHash()
match streamer.store.get_payload_envelope(&root) {
Ok(opt_envelope) => {
results.push((root, Ok(opt_envelope.map(Arc::new))));
}
Err(e) => {
results.push((root, Err(BeaconChainError::DBError(e))));
}
}
}
results
},
"load_execution_payload_envelopes",
)
.await
.map_err(BeaconChainError::from)
}
async fn stream_payload_envelopes(
self: Arc<Self>,
beacon_block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<PayloadEnvelopeResult<T::EthSpec>>)>,
) {
let results = match self.load_envelopes(&beacon_block_roots).await {
Ok(results) => results,
Err(e) => {
send_errors(beacon_block_roots, sender, e).await;
return;
}
};
for (root, result) in results {
if sender.send((root, Arc::new(result))).is_err() {
break;
}
}
}
pub async fn stream(
self: Arc<Self>,
beacon_block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<PayloadEnvelopeResult<T::EthSpec>>)>,
) {
self.stream_payload_envelopes(beacon_block_roots, sender)
.await;
}
pub fn launch_stream(
self: Arc<Self>,
beacon_block_roots: Vec<Hash256>,
) -> impl Stream<Item = (Hash256, Arc<PayloadEnvelopeResult<T::EthSpec>>)> {
let (envelope_tx, envelope_rx) = mpsc::unbounded_channel();
debug!(
envelopes = beacon_block_roots.len(),
"Launching a PayloadEnvelopeStreamer"
);
let executor = self.task_executor.clone();
executor.spawn(
self.stream(beacon_block_roots, envelope_tx),
"get_payload_envelopes_sender",
);
UnboundedReceiverStream::new(envelope_rx)
}
}
async fn send_errors<E: EthSpec>(
beacon_block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<PayloadEnvelopeResult<E>>)>,
beacon_chain_error: BeaconChainError,
) {
let result = Arc::new(Err(beacon_chain_error));
for beacon_block_root in beacon_block_roots {
if sender.send((beacon_block_root, result.clone())).is_err() {
break;
}
}
}

View File

@@ -25,6 +25,7 @@ pub mod envelope_times_cache;
mod errors;
pub mod events;
pub mod execution_payload;
pub mod execution_payload_envelope_streamer;
pub mod fetch_blobs;
pub mod fork_choice_signal;
pub mod graffiti_calculator;