mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-09 19:51:47 +00:00
Retrospective invalidation of exec. payloads for opt. sync (#2837)
## Issue Addressed
NA
## Proposed Changes
Adds the functionality to allow blocks to be validated/invalidated after their import as per the [optimistic sync spec](https://github.com/ethereum/consensus-specs/blob/dev/sync/optimistic.md#how-to-optimistically-import-blocks). This means:
- Updating `ProtoArray` to allow flipping the `execution_status` of ancestors/descendants based on payload validity updates.
- Creating separation between `execution_layer` and the `beacon_chain` by creating a `PayloadStatus` struct.
- Refactoring how the `execution_layer` selects a `PayloadStatus` from the multiple statuses returned from multiple EEs.
- Adding testing framework for optimistic imports.
- Add `ExecutionBlockHash(Hash256)` new-type struct to avoid confusion between *beacon block roots* and *execution payload hashes*.
- Add `merge` to [`FORKS`](c3a793fd73/Makefile (L17)) in the `Makefile` to ensure we test the beacon chain with merge settings.
- Fix some tests here that were failing due to a missing execution layer.
## TODO
- [ ] Balance tests
Co-authored-by: Mark Mackey <mark@sigmaprime.io>
This commit is contained in:
@@ -7,10 +7,11 @@
|
||||
use engine_api::{Error as ApiError, *};
|
||||
use engines::{Engine, EngineError, Engines, ForkChoiceState, Logging};
|
||||
use lru::LruCache;
|
||||
use payload_status::process_multiple_payload_statuses;
|
||||
use sensitive_url::SensitiveUrl;
|
||||
use slog::{crit, debug, error, info, Logger};
|
||||
use slot_clock::SlotClock;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -19,12 +20,14 @@ use tokio::{
|
||||
sync::{Mutex, MutexGuard},
|
||||
time::{sleep, sleep_until, Instant},
|
||||
};
|
||||
use types::{ChainSpec, Epoch, ProposerPreparationData};
|
||||
use types::{ChainSpec, Epoch, ExecutionBlockHash, ProposerPreparationData};
|
||||
|
||||
pub use engine_api::{http::HttpJsonRpc, PayloadAttributes, PayloadStatusV1Status};
|
||||
pub use payload_status::PayloadStatus;
|
||||
|
||||
mod engine_api;
|
||||
mod engines;
|
||||
mod payload_status;
|
||||
pub mod test_utils;
|
||||
|
||||
/// Each time the `ExecutionLayer` retrieves a block from an execution node, it stores that block
|
||||
@@ -50,6 +53,7 @@ pub enum Error {
|
||||
ShuttingDown,
|
||||
FeeRecipientUnspecified,
|
||||
ConsensusFailure,
|
||||
MissingLatestValidHash,
|
||||
}
|
||||
|
||||
impl From<ApiError> for Error {
|
||||
@@ -68,7 +72,7 @@ struct Inner {
|
||||
engines: Engines<HttpJsonRpc>,
|
||||
suggested_fee_recipient: Option<Address>,
|
||||
proposer_preparation_data: Mutex<HashMap<u64, ProposerPreparationDataEntry>>,
|
||||
execution_blocks: Mutex<LruCache<Hash256, ExecutionBlock>>,
|
||||
execution_blocks: Mutex<LruCache<ExecutionBlockHash, ExecutionBlock>>,
|
||||
executor: TaskExecutor,
|
||||
log: Logger,
|
||||
}
|
||||
@@ -137,7 +141,9 @@ impl ExecutionLayer {
|
||||
}
|
||||
|
||||
/// Note: this function returns a mutex guard, be careful to avoid deadlocks.
|
||||
async fn execution_blocks(&self) -> MutexGuard<'_, LruCache<Hash256, ExecutionBlock>> {
|
||||
async fn execution_blocks(
|
||||
&self,
|
||||
) -> MutexGuard<'_, LruCache<ExecutionBlockHash, ExecutionBlock>> {
|
||||
self.inner.execution_blocks.lock().await
|
||||
}
|
||||
|
||||
@@ -384,10 +390,10 @@ impl ExecutionLayer {
|
||||
/// will be contacted.
|
||||
pub async fn get_payload<T: EthSpec>(
|
||||
&self,
|
||||
parent_hash: Hash256,
|
||||
parent_hash: ExecutionBlockHash,
|
||||
timestamp: u64,
|
||||
random: Hash256,
|
||||
finalized_block_hash: Hash256,
|
||||
finalized_block_hash: ExecutionBlockHash,
|
||||
proposer_index: u64,
|
||||
) -> Result<ExecutionPayload<T>, Error> {
|
||||
let suggested_fee_recipient = self.get_suggested_fee_recipient(proposer_index).await;
|
||||
@@ -434,7 +440,16 @@ impl ExecutionLayer {
|
||||
)
|
||||
.await
|
||||
.map(|response| response.payload_id)?
|
||||
.ok_or(ApiError::PayloadIdUnavailable)?
|
||||
.ok_or_else(|| {
|
||||
error!(
|
||||
self.log(),
|
||||
"Exec engine unable to produce payload";
|
||||
"msg" => "No payload ID, the engine is likely syncing. \
|
||||
This has the potential to cause a missed block proposal.",
|
||||
);
|
||||
|
||||
ApiError::PayloadIdUnavailable
|
||||
})?
|
||||
};
|
||||
|
||||
engine.api.get_payload_v1(payload_id).await
|
||||
@@ -459,7 +474,7 @@ impl ExecutionLayer {
|
||||
pub async fn notify_new_payload<T: EthSpec>(
|
||||
&self,
|
||||
execution_payload: &ExecutionPayload<T>,
|
||||
) -> Result<(PayloadStatusV1Status, Option<Vec<Hash256>>), Error> {
|
||||
) -> Result<PayloadStatus, Error> {
|
||||
debug!(
|
||||
self.log(),
|
||||
"Issuing engine_newPayload";
|
||||
@@ -473,81 +488,11 @@ impl ExecutionLayer {
|
||||
.broadcast(|engine| engine.api.new_payload_v1(execution_payload.clone()))
|
||||
.await;
|
||||
|
||||
let mut errors = vec![];
|
||||
let mut valid = 0;
|
||||
let mut invalid = 0;
|
||||
let mut syncing = 0;
|
||||
let mut invalid_latest_valid_hash = HashSet::new();
|
||||
for result in broadcast_results {
|
||||
match result {
|
||||
Ok(response) => match (&response.latest_valid_hash, &response.status) {
|
||||
(Some(latest_hash), &PayloadStatusV1Status::Valid) => {
|
||||
// According to a strict interpretation of the spec, the EE should never
|
||||
// respond with `VALID` *and* a `latest_valid_hash`.
|
||||
//
|
||||
// For the sake of being liberal with what we accept, we will accept a
|
||||
// `latest_valid_hash` *only if* it matches the submitted payload.
|
||||
// Otherwise, register an error.
|
||||
if latest_hash == &execution_payload.block_hash {
|
||||
valid += 1;
|
||||
} else {
|
||||
errors.push(EngineError::Api {
|
||||
id: "unknown".to_string(),
|
||||
error: engine_api::Error::BadResponse(
|
||||
format!(
|
||||
"new_payload: response.status = Valid but invalid latest_valid_hash. Expected({:?}) Found({:?})",
|
||||
execution_payload.block_hash,
|
||||
latest_hash,
|
||||
)
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
(Some(latest_hash), &PayloadStatusV1Status::Invalid) => {
|
||||
invalid += 1;
|
||||
invalid_latest_valid_hash.insert(*latest_hash);
|
||||
}
|
||||
(None, &PayloadStatusV1Status::InvalidBlockHash)
|
||||
| (None, &PayloadStatusV1Status::InvalidTerminalBlock) => invalid += 1,
|
||||
(None, &PayloadStatusV1Status::Syncing)
|
||||
| (None, &PayloadStatusV1Status::Accepted) => syncing += 1,
|
||||
_ => errors.push(EngineError::Api {
|
||||
id: "unknown".to_string(),
|
||||
error: engine_api::Error::BadResponse(format!(
|
||||
"new_payload: response does not conform to engine API spec: {:?}",
|
||||
response,
|
||||
)),
|
||||
}),
|
||||
},
|
||||
Err(e) => errors.push(e),
|
||||
}
|
||||
}
|
||||
|
||||
if valid > 0 && invalid > 0 {
|
||||
crit!(
|
||||
self.log(),
|
||||
"Consensus failure between execution nodes";
|
||||
"method" => "new_payload"
|
||||
);
|
||||
// In this situation, better to have a failure of liveness than vote on a potentially invalid chain
|
||||
return Err(Error::ConsensusFailure);
|
||||
}
|
||||
|
||||
if valid > 0 {
|
||||
Ok((
|
||||
PayloadStatusV1Status::Valid,
|
||||
Some(vec![execution_payload.block_hash]),
|
||||
))
|
||||
} else if invalid > 0 {
|
||||
Ok((
|
||||
PayloadStatusV1Status::Invalid,
|
||||
Some(invalid_latest_valid_hash.into_iter().collect()),
|
||||
))
|
||||
} else if syncing > 0 {
|
||||
Ok((PayloadStatusV1Status::Syncing, None))
|
||||
} else {
|
||||
Err(Error::EngineErrors(errors))
|
||||
}
|
||||
process_multiple_payload_statuses(
|
||||
execution_payload.block_hash,
|
||||
broadcast_results.into_iter(),
|
||||
self.log(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Maps to the `engine_consensusValidated` JSON-RPC call.
|
||||
@@ -565,10 +510,10 @@ impl ExecutionLayer {
|
||||
/// - An error, if all nodes return an error.
|
||||
pub async fn notify_forkchoice_updated(
|
||||
&self,
|
||||
head_block_hash: Hash256,
|
||||
finalized_block_hash: Hash256,
|
||||
head_block_hash: ExecutionBlockHash,
|
||||
finalized_block_hash: ExecutionBlockHash,
|
||||
payload_attributes: Option<PayloadAttributes>,
|
||||
) -> Result<(PayloadStatusV1Status, Option<Vec<Hash256>>), Error> {
|
||||
) -> Result<PayloadStatus, Error> {
|
||||
debug!(
|
||||
self.log(),
|
||||
"Issuing engine_forkchoiceUpdated";
|
||||
@@ -597,78 +542,13 @@ impl ExecutionLayer {
|
||||
})
|
||||
.await;
|
||||
|
||||
let mut errors = vec![];
|
||||
let mut valid = 0;
|
||||
let mut invalid = 0;
|
||||
let mut syncing = 0;
|
||||
let mut invalid_latest_valid_hash = HashSet::new();
|
||||
for result in broadcast_results {
|
||||
match result {
|
||||
Ok(response) => match (&response.payload_status.latest_valid_hash, &response.payload_status.status) {
|
||||
// TODO(bellatrix) a strict interpretation of the v1.0.0.alpha.6 spec says that
|
||||
// `latest_valid_hash` *cannot* be `None`. However, we accept it to maintain
|
||||
// Geth compatibility for the short term. See:
|
||||
//
|
||||
// https://github.com/ethereum/go-ethereum/issues/24404
|
||||
(None, &PayloadStatusV1Status::Valid) => valid += 1,
|
||||
(Some(latest_hash), &PayloadStatusV1Status::Valid) => {
|
||||
if latest_hash == &head_block_hash {
|
||||
valid += 1;
|
||||
} else {
|
||||
errors.push(EngineError::Api {
|
||||
id: "unknown".to_string(),
|
||||
error: engine_api::Error::BadResponse(
|
||||
format!(
|
||||
"forkchoice_updated: payload_status = Valid but invalid latest_valid_hash. Expected({:?}) Found({:?})",
|
||||
head_block_hash,
|
||||
*latest_hash,
|
||||
)
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
(Some(latest_hash), &PayloadStatusV1Status::Invalid) => {
|
||||
invalid += 1;
|
||||
invalid_latest_valid_hash.insert(*latest_hash);
|
||||
}
|
||||
(None, &PayloadStatusV1Status::InvalidTerminalBlock) => invalid += 1,
|
||||
(None, &PayloadStatusV1Status::Syncing) => syncing += 1,
|
||||
_ => {
|
||||
errors.push(EngineError::Api {
|
||||
id: "unknown".to_string(),
|
||||
error: engine_api::Error::BadResponse(format!(
|
||||
"forkchoice_updated: response does not conform to engine API spec: {:?}",
|
||||
response
|
||||
)),
|
||||
})
|
||||
}
|
||||
}
|
||||
Err(e) => errors.push(e),
|
||||
}
|
||||
}
|
||||
|
||||
if valid > 0 && invalid > 0 {
|
||||
crit!(
|
||||
self.log(),
|
||||
"Consensus failure between execution nodes";
|
||||
"method" => "forkchoice_updated"
|
||||
);
|
||||
// In this situation, better to have a failure of liveness than vote on a potentially invalid chain
|
||||
return Err(Error::ConsensusFailure);
|
||||
}
|
||||
|
||||
if valid > 0 {
|
||||
Ok((PayloadStatusV1Status::Valid, Some(vec![head_block_hash])))
|
||||
} else if invalid > 0 {
|
||||
Ok((
|
||||
PayloadStatusV1Status::Invalid,
|
||||
Some(invalid_latest_valid_hash.into_iter().collect()),
|
||||
))
|
||||
} else if syncing > 0 {
|
||||
Ok((PayloadStatusV1Status::Syncing, None))
|
||||
} else {
|
||||
Err(Error::EngineErrors(errors))
|
||||
}
|
||||
process_multiple_payload_statuses(
|
||||
head_block_hash,
|
||||
broadcast_results
|
||||
.into_iter()
|
||||
.map(|result| result.map(|response| response.payload_status)),
|
||||
self.log(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Used during block production to determine if the merge has been triggered.
|
||||
@@ -681,12 +561,12 @@ impl ExecutionLayer {
|
||||
pub async fn get_terminal_pow_block_hash(
|
||||
&self,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Option<Hash256>, Error> {
|
||||
) -> Result<Option<ExecutionBlockHash>, Error> {
|
||||
let hash_opt = self
|
||||
.engines()
|
||||
.first_success(|engine| async move {
|
||||
let terminal_block_hash = spec.terminal_block_hash;
|
||||
if terminal_block_hash != Hash256::zero() {
|
||||
if terminal_block_hash != ExecutionBlockHash::zero() {
|
||||
if self
|
||||
.get_pow_block(engine, terminal_block_hash)
|
||||
.await?
|
||||
@@ -730,7 +610,7 @@ impl ExecutionLayer {
|
||||
&self,
|
||||
engine: &Engine<HttpJsonRpc>,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Option<Hash256>, ApiError> {
|
||||
) -> Result<Option<ExecutionBlockHash>, ApiError> {
|
||||
let mut block = engine
|
||||
.api
|
||||
.get_block_by_number(BlockByNumberQuery::Tag(LATEST_TAG))
|
||||
@@ -742,7 +622,7 @@ impl ExecutionLayer {
|
||||
loop {
|
||||
let block_reached_ttd = block.total_difficulty >= spec.terminal_total_difficulty;
|
||||
if block_reached_ttd {
|
||||
if block.parent_hash == Hash256::zero() {
|
||||
if block.parent_hash == ExecutionBlockHash::zero() {
|
||||
return Ok(Some(block.block_hash));
|
||||
}
|
||||
let parent = self
|
||||
@@ -790,7 +670,7 @@ impl ExecutionLayer {
|
||||
/// https://github.com/ethereum/consensus-specs/blob/v1.1.0/specs/merge/fork-choice.md
|
||||
pub async fn is_valid_terminal_pow_block_hash(
|
||||
&self,
|
||||
block_hash: Hash256,
|
||||
block_hash: ExecutionBlockHash,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Option<bool>, Error> {
|
||||
let broadcast_results = self
|
||||
@@ -869,7 +749,7 @@ impl ExecutionLayer {
|
||||
async fn get_pow_block(
|
||||
&self,
|
||||
engine: &Engine<HttpJsonRpc>,
|
||||
hash: Hash256,
|
||||
hash: ExecutionBlockHash,
|
||||
) -> Result<Option<ExecutionBlock>, ApiError> {
|
||||
if let Some(cached) = self.execution_blocks().await.get(&hash).copied() {
|
||||
// The block was in the cache, no need to request it from the execution
|
||||
@@ -963,7 +843,7 @@ mod test {
|
||||
MockExecutionLayer::default_params()
|
||||
.move_to_terminal_block()
|
||||
.with_terminal_block(|spec, el, _| async move {
|
||||
let missing_terminal_block = Hash256::repeat_byte(42);
|
||||
let missing_terminal_block = ExecutionBlockHash::repeat_byte(42);
|
||||
|
||||
assert_eq!(
|
||||
el.is_valid_terminal_pow_block_hash(missing_terminal_block, &spec)
|
||||
|
||||
Reference in New Issue
Block a user