Fixes based on feedback

This commit is contained in:
Eitan Seri- Levi
2026-03-03 18:32:57 -08:00
parent 0212e84fa8
commit 8378f7d294
9 changed files with 186 additions and 60 deletions

View File

@@ -325,6 +325,15 @@ pub enum BlockProductionError {
GloasNotImplemented(String),
}
impl From<task_executor::SpawnBlockingError> for BeaconChainError {
fn from(e: task_executor::SpawnBlockingError) -> Self {
match e {
task_executor::SpawnBlockingError::RuntimeShutdown => BeaconChainError::RuntimeShutdown,
task_executor::SpawnBlockingError::JoinError(e) => BeaconChainError::TokioJoin(e),
}
}
}
easy_from_to!(BlockProcessingError, BlockProductionError);
easy_from_to!(BeaconStateError, BlockProductionError);
easy_from_to!(SlotProcessingError, BlockProductionError);

View File

@@ -15,7 +15,9 @@ type PayloadEnvelopeResult<E> =
Result<Option<Arc<SignedExecutionPayloadEnvelope<E>>>, BeaconChainError>;
pub struct PayloadEnvelopeStreamer<T: BeaconChainTypes> {
execution_layer: ExecutionLayer<T::EthSpec>,
// TODO(gloas) remove _ when we use the execution layer
// to load payload envelopes
_execution_layer: ExecutionLayer<T::EthSpec>,
store: BeaconStore<T>,
task_executor: TaskExecutor,
_check_caches: CheckCaches,
@@ -36,7 +38,7 @@ impl<T: BeaconChainTypes> PayloadEnvelopeStreamer<T> {
.clone();
Ok(Arc::new(Self {
execution_layer,
_execution_layer: execution_layer,
store,
task_executor,
_check_caches: check_caches,
@@ -53,31 +55,56 @@ impl<T: BeaconChainTypes> PayloadEnvelopeStreamer<T> {
None
}
// used when the execution engine doesn't support the payload bodies methods
async fn stream_payload_envelopes_fallback(
async fn load_envelopes(
self: &Arc<Self>,
beacon_block_roots: &[Hash256],
) -> Result<Vec<(Hash256, PayloadEnvelopeResult<T::EthSpec>)>, BeaconChainError> {
let streamer = self.clone();
let roots = beacon_block_roots.to_vec();
// Loading from the DB is slow -> spawn a blocking task
self.task_executor
.spawn_blocking_and_await(
move || {
let mut results = Vec::new();
for root in roots {
if let Some(cached) = streamer.check_payload_envelope_cache(root) {
results.push((root, Ok(Some(cached))));
continue;
}
// TODO(gloas) we'll want to use the execution layer directly to call
// the engine api method eth_getBlockByHash()
match streamer.store.get_payload_envelope(&root) {
Ok(opt_envelope) => {
results.push((root, Ok(opt_envelope.map(Arc::new))));
}
Err(e) => {
results.push((root, Err(BeaconChainError::DBError(e))));
}
}
}
results
},
"load_execution_payload_envelopes",
)
.await
.map_err(BeaconChainError::from)
}
async fn stream_payload_envelopes(
self: Arc<Self>,
beacon_block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<PayloadEnvelopeResult<T::EthSpec>>)>,
) {
debug!("Using slower fallback method of eth_getBlockByHash()");
for beacon_block_root in beacon_block_roots {
let cached_envelope = self.check_payload_envelope_cache(beacon_block_root);
let results = match self.load_envelopes(&beacon_block_roots).await {
Ok(results) => results,
Err(e) => {
send_errors(beacon_block_roots, sender, e).await;
return;
}
};
let envelope_result = if cached_envelope.is_some() {
Ok(cached_envelope)
} else {
// TODO(gloas) we'll want to use the execution layer directly to call
// the engine api method eth_getBlockByHash()
self.store
.get_payload_envelope(&beacon_block_root)
.map(|opt_envelope| opt_envelope.map(Arc::new))
.map_err(BeaconChainError::DBError)
};
if sender
.send((beacon_block_root, Arc::new(envelope_result)))
.is_err()
{
for (root, result) in results {
if sender.send((root, Arc::new(result))).is_err() {
break;
}
}
@@ -88,22 +115,8 @@ impl<T: BeaconChainTypes> PayloadEnvelopeStreamer<T> {
beacon_block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<PayloadEnvelopeResult<T::EthSpec>>)>,
) {
match self
.execution_layer
.get_engine_capabilities(None)
.await
.map_err(Box::new)
.map_err(BeaconChainError::EngineGetCapabilititesFailed)
{
Ok(_engine_capabilities) => {
// TODO(gloas) should check engine capabilities for get_payload_bodies_by_range_v1
self.stream_payload_envelopes_fallback(beacon_block_roots, sender)
.await;
}
Err(e) => {
send_errors(beacon_block_roots, sender, e).await;
}
}
self.stream_payload_envelopes(beacon_block_roots, sender)
.await;
}
pub fn launch_stream(

View File

@@ -528,9 +528,7 @@ impl PayloadEnvelopesByRootRequest {
beacon_block_roots: Vec<Hash256>,
fork_context: &ForkContext,
) -> Result<Self, String> {
let max_requests_envelopes = fork_context
.spec
.max_request_payloads(fork_context.current_fork_name());
let max_requests_envelopes = fork_context.spec.max_request_payloads();
let beacon_block_roots =
RuntimeVariableList::new(beacon_block_roots, max_requests_envelopes).map_err(|e| {

View File

@@ -22,7 +22,7 @@ use types::{
LightClientBootstrap, LightClientBootstrapAltair, LightClientFinalityUpdate,
LightClientFinalityUpdateAltair, LightClientOptimisticUpdate,
LightClientOptimisticUpdateAltair, LightClientUpdate, MainnetEthSpec, MinimalEthSpec,
SignedBeaconBlock,
SignedBeaconBlock, SignedExecutionPayloadEnvelope,
};
// Note: Hardcoding the `EthSpec` type for `SignedBeaconBlock` as min/max values is
@@ -65,6 +65,12 @@ pub static SIGNED_BEACON_BLOCK_BELLATRIX_MAX: LazyLock<usize> =
+ types::ExecutionPayload::<MainnetEthSpec>::max_execution_payload_bellatrix_size() // adding max size of execution payload (~16gb)
+ ssz::BYTES_PER_LENGTH_OFFSET); // Adding the additional ssz offset for the `ExecutionPayload` field
pub static SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MIN: LazyLock<usize> =
LazyLock::new(SignedExecutionPayloadEnvelope::<MainnetEthSpec>::min_size);
pub static SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MAX: LazyLock<usize> =
LazyLock::new(SignedExecutionPayloadEnvelope::<MainnetEthSpec>::max_size);
pub static BLOB_SIDECAR_SIZE: LazyLock<usize> =
LazyLock::new(BlobSidecar::<MainnetEthSpec>::max_size);
@@ -147,6 +153,14 @@ pub fn rpc_block_limits_by_fork(current_fork: ForkName) -> RpcLimits {
}
}
/// Returns the rpc limits for payload_envelope_by_range and payload_envelope_by_root responses.
pub fn rpc_payload_limits() -> RpcLimits {
RpcLimits::new(
*SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MIN,
*SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MAX,
)
}
fn rpc_light_client_updates_by_range_limits_by_fork(current_fork: ForkName) -> RpcLimits {
let altair_fixed_len = LightClientFinalityUpdateAltair::<MainnetEthSpec>::ssz_fixed_len();
@@ -425,8 +439,14 @@ impl SupportedProtocol {
}
if fork_context.fork_exists(ForkName::Gloas) {
supported.extend_from_slice(&[
ProtocolId::new(SupportedProtocol::PayloadEnvelopesByRangeV1, Encoding::SSZSnappy),
ProtocolId::new(SupportedProtocol::PayloadEnvelopesByRootV1, Encoding::SSZSnappy),
ProtocolId::new(
SupportedProtocol::PayloadEnvelopesByRangeV1,
Encoding::SSZSnappy,
),
ProtocolId::new(
SupportedProtocol::PayloadEnvelopesByRootV1,
Encoding::SSZSnappy,
),
]);
}
supported
@@ -535,7 +555,9 @@ impl ProtocolId {
<PayloadEnvelopesByRangeRequest as Encode>::ssz_fixed_len(),
<PayloadEnvelopesByRangeRequest as Encode>::ssz_fixed_len(),
),
Protocol::PayloadEnvelopesByRoot => RpcLimits::new(0, spec.max_blocks_by_root_request),
Protocol::PayloadEnvelopesByRoot => {
RpcLimits::new(0, spec.max_payload_envelopes_by_root_request)
}
Protocol::BlobsByRange => RpcLimits::new(
<BlobsByRangeRequest as Encode>::ssz_fixed_len(),
<BlobsByRangeRequest as Encode>::ssz_fixed_len(),
@@ -577,9 +599,7 @@ impl ProtocolId {
Protocol::PayloadEnvelopesByRange => {
rpc_block_limits_by_fork(fork_context.current_fork_name())
}
Protocol::PayloadEnvelopesByRoot => {
rpc_block_limits_by_fork(fork_context.current_fork_name())
}
Protocol::PayloadEnvelopesByRoot => rpc_payload_limits(),
Protocol::BlobsByRange => rpc_blob_limits::<E>(),
Protocol::BlobsByRoot => rpc_blob_limits::<E>(),
Protocol::DataColumnsByRoot => {

View File

@@ -2160,7 +2160,7 @@ async fn test_payload_envelopes_by_range() {
let slot_count = 32;
// Manually store payload envelopes for each block in the range
let mut expected_count = 0;
let mut expected_roots = Vec::new();
for slot in start_slot..slot_count {
if let Some(root) = rig
.chain
@@ -2172,13 +2172,13 @@ async fn test_payload_envelopes_by_range() {
.store
.put_payload_envelope(&root, envelope)
.unwrap();
expected_count += 1;
expected_roots.push(root);
}
}
rig.enqueue_payload_envelopes_by_range_request(start_slot, slot_count);
let mut actual_count = 0;
let mut actual_roots = Vec::new();
while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
@@ -2186,8 +2186,8 @@ async fn test_payload_envelopes_by_range() {
inbound_request_id: _,
} = next
{
if envelope.is_some() {
actual_count += 1;
if let Some(env) = envelope {
actual_roots.push(env.beacon_block_root());
} else {
break;
}
@@ -2198,7 +2198,7 @@ async fn test_payload_envelopes_by_range() {
panic!("unexpected message {:?}", next);
}
}
assert_eq!(expected_count, actual_count);
assert_eq!(expected_roots, actual_roots);
}
#[tokio::test]
@@ -2226,7 +2226,7 @@ async fn test_payload_envelopes_by_root() {
let roots = RuntimeVariableList::new(vec![block_root], 1).unwrap();
rig.enqueue_payload_envelopes_by_root_request(roots);
let mut actual_count = 0;
let mut actual_roots = Vec::new();
while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
@@ -2234,8 +2234,8 @@ async fn test_payload_envelopes_by_root() {
inbound_request_id: _,
} = next
{
if envelope.is_some() {
actual_count += 1;
if let Some(env) = envelope {
actual_roots.push(env.beacon_block_root());
} else {
break;
}
@@ -2243,7 +2243,7 @@ async fn test_payload_envelopes_by_root() {
panic!("unexpected message {:?}", next);
}
}
assert_eq!(1, actual_count);
assert_eq!(vec![block_root], actual_roots);
}
#[tokio::test]