Move gloas http logic to modules.

This commit is contained in:
Jimmy Chen
2026-02-05 12:25:40 +11:00
parent f9bfaf9da6
commit 339ba6e143
10 changed files with 321 additions and 258 deletions

View File

@@ -0,0 +1,127 @@
use crate::task_spawner::{Priority, TaskSpawner};
use crate::utils::{ChainFilter, EthV1Filter, NetworkTxFilter, ResponseFilter, TaskSpawnerFilter};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use bytes::Bytes;
use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use ssz::Decode;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{info, warn};
use types::SignedExecutionPayloadEnvelope;
use warp::{Filter, Rejection, Reply, reply::Response};
// POST beacon/execution_payload_envelope (SSZ)
pub(crate) fn post_beacon_execution_payload_envelope_ssz<T: BeaconChainTypes>(
eth_v1: EthV1Filter,
task_spawner_filter: TaskSpawnerFilter<T>,
chain_filter: ChainFilter<T>,
network_tx_filter: NetworkTxFilter<T>,
) -> ResponseFilter {
eth_v1
.and(warp::path("beacon"))
.and(warp::path("execution_payload_envelope"))
.and(warp::path::end())
.and(warp::header::exact(
CONTENT_TYPE_HEADER,
SSZ_CONTENT_TYPE_HEADER,
))
.and(warp::body::bytes())
.and(task_spawner_filter)
.and(chain_filter)
.and(network_tx_filter)
.then(
|body_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let envelope =
SignedExecutionPayloadEnvelope::<T::EthSpec>::from_ssz_bytes(&body_bytes)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
})?;
publish_execution_payload_envelope(envelope, chain, &network_tx).await
})
},
)
.boxed()
}
// POST beacon/execution_payload_envelope
pub(crate) fn post_beacon_execution_payload_envelope<T: BeaconChainTypes>(
eth_v1: EthV1Filter,
task_spawner_filter: TaskSpawnerFilter<T>,
chain_filter: ChainFilter<T>,
network_tx_filter: NetworkTxFilter<T>,
) -> ResponseFilter {
eth_v1
.clone()
.and(warp::path("beacon"))
.and(warp::path("execution_payload_envelope"))
.and(warp::path::end())
.and(warp::body::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.then(
|envelope: SignedExecutionPayloadEnvelope<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_execution_payload_envelope(envelope, chain, &network_tx).await
})
},
)
.boxed()
}
/// Publishes a signed execution payload envelope to the network.
pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
envelope: SignedExecutionPayloadEnvelope<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
) -> Result<Response, Rejection> {
let slot = envelope.message.slot;
let beacon_block_root = envelope.message.beacon_block_root;
// Basic validation: check that the slot is reasonable
let current_slot = chain.slot().map_err(|_| {
warp_utils::reject::custom_server_error("Unable to get current slot".into())
})?;
// Don't accept envelopes too far in the future
if slot > current_slot + 1 {
return Err(warp_utils::reject::custom_bad_request(format!(
"Envelope slot {} is too far in the future (current slot: {})",
slot, current_slot
)));
}
// TODO(gloas): Add more validation:
// - Verify the signature
// - Check builder_index is valid
// - Verify the envelope references a known block
info!(
%slot,
%beacon_block_root,
builder_index = envelope.message.builder_index,
"Publishing signed execution payload envelope to network"
);
// Publish to the network
crate::utils::publish_pubsub_message(
network_tx,
PubsubMessage::ExecutionPayload(Box::new(envelope)),
)
.map_err(|_| {
warn!(%slot, "Failed to publish execution payload envelope to network");
warp_utils::reject::custom_server_error(
"Unable to publish execution payload envelope to network".into(),
)
})?;
Ok(warp::reply().into_response())
}

View File

@@ -1,2 +1,3 @@
pub mod execution_payload_envelope;
pub mod pool;
pub mod states;

View File

@@ -28,7 +28,6 @@ pub fn get_beacon_state_pending_consolidations<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("pending_consolidations"))
.and(warp::path::end())
.then(