Merge remote-tracking branch 'upstream/unstable' into gloas-containers

This commit is contained in:
Mark Mackey
2025-10-15 10:12:48 -05:00
175 changed files with 6812 additions and 4540 deletions

View File

@@ -43,7 +43,7 @@ use tokio::{
time::sleep,
};
use tokio_stream::wrappers::WatchStream;
use tracing::{debug, error, info, warn};
use tracing::{Instrument, debug, debug_span, error, info, instrument, warn};
use tree_hash::TreeHash;
use types::beacon_block_body::KzgCommitments;
use types::builder_bid::BuilderBid;
@@ -844,6 +844,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
}
/// Returns the fee-recipient address that should be used to build a block
#[instrument(level = "debug", skip_all)]
pub async fn get_suggested_fee_recipient(&self, proposer_index: u64) -> Address {
if let Some(preparation_data_entry) =
self.proposer_preparation_data().await.get(&proposer_index)
@@ -868,6 +869,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
}
}
#[instrument(level = "debug", skip_all)]
pub async fn get_proposer_gas_limit(&self, proposer_index: u64) -> Option<u64> {
self.proposer_preparation_data()
.await
@@ -884,6 +886,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
///
/// The result will be returned from the first node that returns successfully. No more nodes
/// will be contacted.
#[instrument(level = "debug", skip_all)]
pub async fn get_payload(
&self,
payload_parameters: PayloadParameters<'_>,
@@ -989,6 +992,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
timed_future(metrics::GET_BLINDED_PAYLOAD_BUILDER, async {
builder
.get_builder_header::<E>(slot, parent_hash, pubkey)
.instrument(debug_span!("get_builder_header"))
.await
}),
timed_future(metrics::GET_BLINDED_PAYLOAD_LOCAL, async {
@@ -1230,6 +1234,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
.await
}
#[instrument(level = "debug", skip_all)]
async fn get_full_payload_with(
&self,
payload_parameters: PayloadParameters<'_>,
@@ -1905,9 +1910,19 @@ impl<E: EthSpec> ExecutionLayer<E> {
) -> Result<SubmitBlindedBlockResponse<E>, Error> {
debug!(?block_root, "Sending block to builder");
if spec.is_fulu_scheduled() {
self.post_builder_blinded_blocks_v2(block_root, block)
let resp = self
.post_builder_blinded_blocks_v2(block_root, block)
.await
.map(|()| SubmitBlindedBlockResponse::V2)
.map(|()| SubmitBlindedBlockResponse::V2);
// Fallback to v1 if v2 fails because the relay doesn't support it.
// Note: we should remove the fallback post fulu when all relays have support for v2.
if resp.is_err() {
self.post_builder_blinded_blocks_v1(block_root, block)
.await
.map(|full_payload| SubmitBlindedBlockResponse::V1(Box::new(full_payload)))
} else {
resp
}
} else {
self.post_builder_blinded_blocks_v1(block_root, block)
.await
@@ -2023,7 +2038,9 @@ impl<E: EthSpec> ExecutionLayer<E> {
relay_response_ms = duration.as_millis(),
?block_root,
"Successfully submitted blinded block to the builder"
)
);
Ok(())
}
Err(e) => {
metrics::inc_counter_vec(
@@ -2036,11 +2053,10 @@ impl<E: EthSpec> ExecutionLayer<E> {
relay_response_ms = duration.as_millis(),
?block_root,
"Failed to submit blinded block to the builder"
)
);
Err(e)
}
}
Ok(())
} else {
Err(Error::NoPayloadBuilder)
}

View File

@@ -3,8 +3,8 @@ use crate::{Config, ExecutionLayer, PayloadAttributes, PayloadParameters};
use bytes::Bytes;
use eth2::types::PublishBlockRequest;
use eth2::types::{
BlobsBundle, BlockId, BroadcastValidation, EventKind, EventTopic, FullPayloadContents,
ProposerData, StateId, ValidatorId,
BlobsBundle, BlockId, BroadcastValidation, EndpointVersion, EventKind, EventTopic,
FullPayloadContents, ProposerData, StateId, ValidatorId,
};
use eth2::{
BeaconNodeHttpClient, CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER,
@@ -307,6 +307,10 @@ pub struct MockBuilder<E: EthSpec> {
payload_id_cache: Arc<RwLock<HashMap<ExecutionBlockHash, PayloadParametersCloned>>>,
/// If set to `true`, sets the bid returned by `get_header` to Uint256::MAX
max_bid: bool,
/// Broadcast the full block with payload to the attached beacon node (simulating the relay).
///
/// Turning this off is useful for testing.
broadcast_to_bn: bool,
/// A cache that stores the proposers index for a given epoch
proposers_cache: Arc<RwLock<HashMap<Epoch, Vec<ProposerData>>>>,
}
@@ -315,6 +319,9 @@ impl<E: EthSpec> MockBuilder<E> {
pub fn new_for_testing(
mock_el_url: SensitiveUrl,
beacon_url: SensitiveUrl,
validate_pubkey: bool,
apply_operations: bool,
broadcast_to_bn: bool,
spec: Arc<ChainSpec>,
executor: TaskExecutor,
) -> (Self, (SocketAddr, impl Future<Output = ()>)) {
@@ -332,12 +339,15 @@ impl<E: EthSpec> MockBuilder<E> {
let el = ExecutionLayer::from_config(config, executor.clone()).unwrap();
let max_bid = false;
let builder = MockBuilder::new(
el,
BeaconNodeHttpClient::new(beacon_url, Timeouts::set_all(Duration::from_secs(1))),
true,
true,
false,
validate_pubkey,
apply_operations,
broadcast_to_bn,
max_bid,
spec,
None,
);
@@ -353,6 +363,7 @@ impl<E: EthSpec> MockBuilder<E> {
beacon_client: BeaconNodeHttpClient,
validate_pubkey: bool,
apply_operations: bool,
broadcast_to_bn: bool,
max_bid: bool,
spec: Arc<ChainSpec>,
sk: Option<&[u8]>,
@@ -382,6 +393,7 @@ impl<E: EthSpec> MockBuilder<E> {
proposers_cache: Arc::new(RwLock::new(HashMap::new())),
apply_operations,
max_bid,
broadcast_to_bn,
genesis_time: None,
}
}
@@ -462,14 +474,20 @@ impl<E: EthSpec> MockBuilder<E> {
return Err("invalid fork".to_string());
}
};
let block_hash = block
.message()
.body()
.execution_payload()
.unwrap()
.block_hash();
info!(
block_hash = %root,
execution_payload_root = %root,
?block_hash,
"Submitting blinded beacon block to builder"
);
let payload = self
.el
.get_payload_by_root(&root)
.ok_or_else(|| "missing payload for tx root".to_string())?;
let payload = self.el.get_payload_by_root(&root).ok_or_else(|| {
format!("missing payload for root: {root:?}, block_hash: {block_hash:?}",)
})?;
let (payload, blobs) = payload.deconstruct();
let full_block = block
@@ -478,16 +496,28 @@ impl<E: EthSpec> MockBuilder<E> {
debug!(
txs_count = payload.transactions().len(),
blob_count = blobs.as_ref().map(|b| b.commitments.len()),
"Got full payload, sending to local beacon node for propagation"
"Got full payload"
);
let publish_block_request = PublishBlockRequest::new(
Arc::new(full_block),
blobs.clone().map(|b| (b.proofs, b.blobs)),
);
self.beacon_client
.post_beacon_blocks_v2(&publish_block_request, Some(BroadcastValidation::Gossip))
.await
.map_err(|e| format!("Failed to post blinded block {:?}", e))?;
if self.broadcast_to_bn {
debug!(
block_hash = ?payload.block_hash(),
"Broadcasting builder block to BN"
);
let publish_block_request = PublishBlockRequest::new(
Arc::new(full_block),
blobs.clone().map(|b| (b.proofs, b.blobs)),
);
self.beacon_client
.post_beacon_blocks_v2(
&publish_block_request,
Some(BroadcastValidation::ConsensusAndEquivocation),
)
.await
.map_err(|e| {
// XXX: this should really be a 400 but warp makes that annoyingly difficult
format!("Failed to post blinded block {e:?}")
})?;
}
Ok(FullPayloadContents::new(payload, blobs))
}
@@ -518,16 +548,29 @@ impl<E: EthSpec> MockBuilder<E> {
info!("Got payload params");
let fork = self.fork_name_at_slot(slot);
let payload_response_type = self
.el
.get_full_payload_caching(PayloadParameters {
parent_hash: payload_parameters.parent_hash,
parent_gas_limit: payload_parameters.parent_gas_limit,
proposer_gas_limit: payload_parameters.proposer_gas_limit,
payload_attributes: &payload_parameters.payload_attributes,
forkchoice_update_params: &payload_parameters.forkchoice_update_params,
current_fork: payload_parameters.current_fork,
})
.get_full_payload_with(
PayloadParameters {
parent_hash: payload_parameters.parent_hash,
parent_gas_limit: payload_parameters.parent_gas_limit,
proposer_gas_limit: payload_parameters.proposer_gas_limit,
payload_attributes: &payload_parameters.payload_attributes,
forkchoice_update_params: &payload_parameters.forkchoice_update_params,
current_fork: payload_parameters.current_fork,
},
// If apply_operations is set, do NOT cache the payload at this point, we are about
// to mutate it and it would be incorrect to cache the unmutated payload.
//
// This is a flaw in apply_operations generally, if you want the mock builder to
// actually return payloads then this option should be turned off.
if self.apply_operations {
|_, _| None
} else {
ExecutionLayer::cache_payload
},
)
.await
.map_err(|e| format!("couldn't get payload {:?}", e))?;
@@ -926,11 +969,21 @@ pub fn serve<E: EthSpec>(
let inner_ctx = builder.clone();
let ctx_filter = warp::any().map(move || inner_ctx.clone());
let prefix = warp::path("eth")
let prefix_v1 = warp::path("eth")
.and(warp::path("v1"))
.and(warp::path("builder"));
let validators = prefix
let prefix_either = warp::path("eth")
.and(
warp::path::param::<EndpointVersion>().or_else(|_| async move {
Err(warp::reject::custom(Custom(
"Invalid EndpointVersion".to_string(),
)))
}),
)
.and(warp::path("builder"));
let validators = prefix_v1
.and(warp::path("validators"))
.and(warp::body::json())
.and(warp::path::end())
@@ -942,61 +995,89 @@ pub fn serve<E: EthSpec>(
.register_validators(registrations)
.await
.map_err(|e| warp::reject::custom(Custom(e)))?;
Ok::<_, Rejection>(warp::reply())
},
)
.boxed();
let blinded_block_ssz = prefix
.and(warp::path("blinded_blocks"))
.and(warp::body::bytes())
.and(warp::header::header::<ForkName>(CONSENSUS_VERSION_HEADER))
.and(warp::path::end())
.and(ctx_filter.clone())
.and_then(
|block_bytes: Bytes, fork_name: ForkName, builder: MockBuilder<E>| async move {
let block =
SignedBlindedBeaconBlock::<E>::from_ssz_bytes_by_fork(&block_bytes, fork_name)
.map_err(|e| warp::reject::custom(Custom(format!("{:?}", e))))?;
let payload = builder
.submit_blinded_block(block)
.await
.map_err(|e| warp::reject::custom(Custom(e)))?;
Ok::<_, warp::reject::Rejection>(
warp::http::Response::builder()
.status(200)
.body(payload.as_ssz_bytes())
.map(add_ssz_content_type_header)
.map(|res| add_consensus_version_header(res, fork_name))
.unwrap(),
)
Ok::<_, Rejection>(warp::reply().into_response())
},
);
let blinded_block =
prefix
let blinded_block_ssz =
prefix_either
.and(warp::path("blinded_blocks"))
.and(warp::body::json())
.and(warp::body::bytes())
.and(warp::header::header::<ForkName>(CONSENSUS_VERSION_HEADER))
.and(warp::path::end())
.and(ctx_filter.clone())
.and_then(
|block: SignedBlindedBeaconBlock<E>,
|endpoint_version,
block_bytes: Bytes,
fork_name: ForkName,
builder: MockBuilder<E>| async move {
if endpoint_version != EndpointVersion(1)
&& endpoint_version != EndpointVersion(2)
{
return Err(warp::reject::custom(Custom(format!(
"Unsupported version: {endpoint_version}"
))));
}
let block = SignedBlindedBeaconBlock::<E>::from_ssz_bytes_by_fork(
&block_bytes,
fork_name,
)
.map_err(|e| warp::reject::custom(Custom(format!("{:?}", e))))?;
let payload = builder
.submit_blinded_block(block)
.await
.map_err(|e| warp::reject::custom(Custom(e)))?;
let resp: ForkVersionedResponse<_> = ForkVersionedResponse {
version: fork_name,
metadata: Default::default(),
data: payload,
};
let json_payload = serde_json::to_string(&resp)
.map_err(|_| reject("coudn't serialize response"))?;
if endpoint_version == EndpointVersion(1) {
Ok::<_, warp::reject::Rejection>(
warp::http::Response::builder()
.status(200)
.body(payload.as_ssz_bytes())
.map(add_ssz_content_type_header)
.map(|res| add_consensus_version_header(res, fork_name))
.unwrap(),
)
} else {
Ok(warp::http::Response::builder()
.status(202)
.body(&[] as &'static [u8])
.map(|res| add_consensus_version_header(res, fork_name))
.unwrap())
}
},
);
let blinded_block = prefix_either
.and(warp::path("blinded_blocks"))
.and(warp::body::json())
.and(warp::header::header::<ForkName>(CONSENSUS_VERSION_HEADER))
.and(warp::path::end())
.and(ctx_filter.clone())
.and_then(
|endpoint_version,
block: SignedBlindedBeaconBlock<E>,
fork_name: ForkName,
builder: MockBuilder<E>| async move {
if endpoint_version != EndpointVersion(1) && endpoint_version != EndpointVersion(2)
{
return Err(warp::reject::custom(Custom(format!(
"Unsupported version: {endpoint_version}"
))));
}
let payload = builder
.submit_blinded_block(block)
.await
.map_err(|e| warp::reject::custom(Custom(e)))?;
let resp: ForkVersionedResponse<_> = ForkVersionedResponse {
version: fork_name,
metadata: Default::default(),
data: payload,
};
let json_payload = serde_json::to_string(&resp)
.map_err(|_| reject("coudn't serialize response"))?;
if endpoint_version == EndpointVersion(1) {
Ok::<_, warp::reject::Rejection>(
warp::http::Response::builder()
.status(200)
@@ -1004,16 +1085,24 @@ pub fn serve<E: EthSpec>(
serde_json::to_string(&json_payload)
.map_err(|_| reject("invalid JSON"))?,
)
.map(|res| add_consensus_version_header(res, fork_name))
.unwrap(),
)
},
);
} else {
Ok(warp::http::Response::builder()
.status(202)
.body("".to_string())
.map(|res| add_consensus_version_header(res, fork_name))
.unwrap())
}
},
);
let status = prefix
let status = prefix_v1
.and(warp::path("status"))
.then(|| async { warp::reply() });
.then(|| async { warp::reply().into_response() });
let header = prefix
let header = prefix_v1
.and(warp::path("header"))
.and(warp::path::param::<Slot>().or_else(|_| async { Err(reject("Invalid slot")) }))
.and(