exchangeCapabilities & Capella Readiness Logging (#3918)

* Undo Passing Spec to Engine API

* Utilize engine_exchangeCapabilities

* Add Logging to Indicate Capella Readiness

* Add exchangeCapabilities to mock_execution_layer

* Send Nested Array for engine_exchangeCapabilities

* Use Mutex Instead of RwLock for EngineCapabilities

* Improve Locking to Avoid Deadlock

* Prettier logic for get_engine_capabilities

* Improve Comments

* Update beacon_node/beacon_chain/src/capella_readiness.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Update beacon_node/beacon_chain/src/capella_readiness.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Update beacon_node/beacon_chain/src/capella_readiness.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Update beacon_node/beacon_chain/src/capella_readiness.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Update beacon_node/beacon_chain/src/capella_readiness.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Update beacon_node/client/src/notifier.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Update beacon_node/execution_layer/src/engine_api/http.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Addressed Michael's Comments

---------

Co-authored-by: Michael Sproul <micsproul@gmail.com>
This commit is contained in:
ethDreamer
2023-01-31 11:26:23 -06:00
committed by GitHub
parent f615fb0885
commit 7b7595347d
18 changed files with 601 additions and 192 deletions

View File

@@ -6,20 +6,27 @@ use serde_json::Value as JsonValue;
use std::sync::Arc;
use types::{EthSpec, ForkName};
pub const GENERIC_ERROR_CODE: i64 = -1234;
pub const BAD_PARAMS_ERROR_CODE: i64 = -32602;
pub const UNKNOWN_PAYLOAD_ERROR_CODE: i64 = -38001;
pub const FORK_REQUEST_MISMATCH_ERROR_CODE: i64 = -32000;
pub async fn handle_rpc<T: EthSpec>(
body: JsonValue,
ctx: Arc<Context<T>>,
) -> Result<JsonValue, String> {
) -> Result<JsonValue, (String, i64)> {
*ctx.previous_request.lock() = Some(body.clone());
let method = body
.get("method")
.and_then(JsonValue::as_str)
.ok_or_else(|| "missing/invalid method field".to_string())?;
.ok_or_else(|| "missing/invalid method field".to_string())
.map_err(|s| (s, GENERIC_ERROR_CODE))?;
let params = body
.get("params")
.ok_or_else(|| "missing/invalid params field".to_string())?;
.ok_or_else(|| "missing/invalid params field".to_string())
.map_err(|s| (s, GENERIC_ERROR_CODE))?;
match method {
ETH_SYNCING => Ok(JsonValue::Bool(false)),
@@ -27,7 +34,8 @@ pub async fn handle_rpc<T: EthSpec>(
let tag = params
.get(0)
.and_then(JsonValue::as_str)
.ok_or_else(|| "missing/invalid params[0] value".to_string())?;
.ok_or_else(|| "missing/invalid params[0] value".to_string())
.map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?;
match tag {
"latest" => Ok(serde_json::to_value(
@@ -36,7 +44,10 @@ pub async fn handle_rpc<T: EthSpec>(
.latest_execution_block(),
)
.unwrap()),
other => Err(format!("The tag {} is not supported", other)),
other => Err((
format!("The tag {} is not supported", other),
BAD_PARAMS_ERROR_CODE,
)),
}
}
ETH_GET_BLOCK_BY_HASH => {
@@ -47,7 +58,8 @@ pub async fn handle_rpc<T: EthSpec>(
.and_then(|s| {
s.parse()
.map_err(|e| format!("unable to parse hash: {:?}", e))
})?;
})
.map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?;
// If we have a static response set, just return that.
if let Some(response) = *ctx.static_get_block_by_hash_response.lock() {
@@ -57,7 +69,8 @@ pub async fn handle_rpc<T: EthSpec>(
let full_tx = params
.get(1)
.and_then(JsonValue::as_bool)
.ok_or_else(|| "missing/invalid params[1] value".to_string())?;
.ok_or_else(|| "missing/invalid params[1] value".to_string())
.map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?;
if full_tx {
Ok(serde_json::to_value(
ctx.execution_block_generator
@@ -76,15 +89,17 @@ pub async fn handle_rpc<T: EthSpec>(
}
ENGINE_NEW_PAYLOAD_V1 | ENGINE_NEW_PAYLOAD_V2 => {
let request = match method {
ENGINE_NEW_PAYLOAD_V1 => {
JsonExecutionPayload::V1(get_param::<JsonExecutionPayloadV1<T>>(params, 0)?)
}
ENGINE_NEW_PAYLOAD_V1 => JsonExecutionPayload::V1(
get_param::<JsonExecutionPayloadV1<T>>(params, 0)
.map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?,
),
ENGINE_NEW_PAYLOAD_V2 => get_param::<JsonExecutionPayloadV2<T>>(params, 0)
.map(|jep| JsonExecutionPayload::V2(jep))
.or_else(|_| {
get_param::<JsonExecutionPayloadV1<T>>(params, 0)
.map(|jep| JsonExecutionPayload::V1(jep))
})?,
})
.map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?,
// TODO(4844) add that here..
_ => unreachable!(),
};
@@ -97,20 +112,29 @@ pub async fn handle_rpc<T: EthSpec>(
match fork {
ForkName::Merge => {
if matches!(request, JsonExecutionPayload::V2(_)) {
return Err(format!(
"{} called with `ExecutionPayloadV2` before capella fork!",
method
return Err((
format!(
"{} called with `ExecutionPayloadV2` before Capella fork!",
method
),
GENERIC_ERROR_CODE,
));
}
}
ForkName::Capella => {
if method == ENGINE_NEW_PAYLOAD_V1 {
return Err(format!("{} called after capella fork!", method));
return Err((
format!("{} called after Capella fork!", method),
GENERIC_ERROR_CODE,
));
}
if matches!(request, JsonExecutionPayload::V1(_)) {
return Err(format!(
"{} called with `ExecutionPayloadV1` after capella fork!",
method
return Err((
format!(
"{} called with `ExecutionPayloadV1` after Capella fork!",
method
),
GENERIC_ERROR_CODE,
));
}
}
@@ -149,14 +173,20 @@ pub async fn handle_rpc<T: EthSpec>(
Ok(serde_json::to_value(JsonPayloadStatusV1::from(response)).unwrap())
}
ENGINE_GET_PAYLOAD_V1 | ENGINE_GET_PAYLOAD_V2 => {
let request: JsonPayloadIdRequest = get_param(params, 0)?;
let request: JsonPayloadIdRequest =
get_param(params, 0).map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?;
let id = request.into();
let response = ctx
.execution_block_generator
.write()
.get_payload(&id)
.ok_or_else(|| format!("no payload for id {:?}", id))?;
.ok_or_else(|| {
(
format!("no payload for id {:?}", id),
UNKNOWN_PAYLOAD_ERROR_CODE,
)
})?;
// validate method called correctly according to shanghai fork time
if ctx
@@ -166,7 +196,10 @@ pub async fn handle_rpc<T: EthSpec>(
== ForkName::Capella
&& method == ENGINE_GET_PAYLOAD_V1
{
return Err(format!("{} called after capella fork!", method));
return Err((
format!("{} called after Capella fork!", method),
FORK_REQUEST_MISMATCH_ERROR_CODE,
));
}
// TODO(4844) add 4844 error checking here
@@ -195,38 +228,42 @@ pub async fn handle_rpc<T: EthSpec>(
}
}
ENGINE_FORKCHOICE_UPDATED_V1 | ENGINE_FORKCHOICE_UPDATED_V2 => {
let forkchoice_state: JsonForkchoiceStateV1 = get_param(params, 0)?;
let forkchoice_state: JsonForkchoiceStateV1 =
get_param(params, 0).map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?;
let payload_attributes = match method {
ENGINE_FORKCHOICE_UPDATED_V1 => {
let jpa1: Option<JsonPayloadAttributesV1> = get_param(params, 1)?;
let jpa1: Option<JsonPayloadAttributesV1> =
get_param(params, 1).map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?;
jpa1.map(JsonPayloadAttributes::V1)
}
ENGINE_FORKCHOICE_UPDATED_V2 => {
// we can't use `deny_unknown_fields` without breaking compatibility with some
// clients that haven't updated to the latest engine_api spec. So instead we'll
// need to deserialize based on timestamp
get_param::<Option<JsonPayloadAttributes>>(params, 1).and_then(|pa| {
pa.and_then(|pa| {
match ctx
.execution_block_generator
.read()
.get_fork_at_timestamp(*pa.timestamp())
{
ForkName::Merge => {
get_param::<Option<JsonPayloadAttributesV1>>(params, 1)
.map(|opt| opt.map(JsonPayloadAttributes::V1))
.transpose()
get_param::<Option<JsonPayloadAttributes>>(params, 1)
.and_then(|pa| {
pa.and_then(|pa| {
match ctx
.execution_block_generator
.read()
.get_fork_at_timestamp(*pa.timestamp())
{
ForkName::Merge => {
get_param::<Option<JsonPayloadAttributesV1>>(params, 1)
.map(|opt| opt.map(JsonPayloadAttributes::V1))
.transpose()
}
ForkName::Capella => {
get_param::<Option<JsonPayloadAttributesV2>>(params, 1)
.map(|opt| opt.map(JsonPayloadAttributes::V2))
.transpose()
}
_ => unreachable!(),
}
ForkName::Capella => {
get_param::<Option<JsonPayloadAttributesV2>>(params, 1)
.map(|opt| opt.map(JsonPayloadAttributes::V2))
.transpose()
}
_ => unreachable!(),
}
})
.transpose()
})
.transpose()
})?
.map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?
}
_ => unreachable!(),
};
@@ -240,20 +277,29 @@ pub async fn handle_rpc<T: EthSpec>(
{
ForkName::Merge => {
if matches!(pa, JsonPayloadAttributes::V2(_)) {
return Err(format!(
"{} called with `JsonPayloadAttributesV2` before capella fork!",
method
return Err((
format!(
"{} called with `JsonPayloadAttributesV2` before Capella fork!",
method
),
GENERIC_ERROR_CODE,
));
}
}
ForkName::Capella => {
if method == ENGINE_FORKCHOICE_UPDATED_V1 {
return Err(format!("{} called after capella fork!", method));
return Err((
format!("{} called after Capella fork!", method),
FORK_REQUEST_MISMATCH_ERROR_CODE,
));
}
if matches!(pa, JsonPayloadAttributes::V1(_)) {
return Err(format!(
"{} called with `JsonPayloadAttributesV1` after capella fork!",
method
return Err((
format!(
"{} called with `JsonPayloadAttributesV1` after Capella fork!",
method
),
FORK_REQUEST_MISMATCH_ERROR_CODE,
));
}
}
@@ -281,10 +327,14 @@ pub async fn handle_rpc<T: EthSpec>(
return Ok(serde_json::to_value(response).unwrap());
}
let mut response = ctx.execution_block_generator.write().forkchoice_updated(
forkchoice_state.into(),
payload_attributes.map(|json| json.into()),
)?;
let mut response = ctx
.execution_block_generator
.write()
.forkchoice_updated(
forkchoice_state.into(),
payload_attributes.map(|json| json.into()),
)
.map_err(|s| (s, GENERIC_ERROR_CODE))?;
if let Some(mut status) = ctx.static_forkchoice_updated_response.lock().clone() {
if status.status == PayloadStatusV1Status::Valid {
@@ -305,9 +355,13 @@ pub async fn handle_rpc<T: EthSpec>(
};
Ok(serde_json::to_value(transition_config).unwrap())
}
other => Err(format!(
"The method {} does not exist/is not available",
other
ENGINE_EXCHANGE_CAPABILITIES => {
let engine_capabilities = ctx.engine_capabilities.read();
Ok(serde_json::to_value(engine_capabilities.to_response()).unwrap())
}
other => Err((
format!("The method {} does not exist/is not available", other),
METHOD_NOT_FOUND_CODE,
)),
}
}