[Merge] Optimistic Sync: Stage 1 (#2686)

* Add payload verification status to fork choice

* Pass payload verification status to import_block

* Add valid back-propagation

* Add head safety status latch to API

* Remove ExecutionLayerStatus

* Add execution info to client notifier

* Update notifier logs

* Change use of "hash" to refer to beacon block

* Shutdown on invalid finalized block

* Tidy, add comments

* Fix failing FC tests

* Allow blocks with unsafe head

* Fix forkchoiceUpdate call on startup
This commit is contained in:
Paul Hauner
2021-10-07 22:24:57 +11:00
parent aa1d57aa55
commit 6dde12f311
17 changed files with 395 additions and 156 deletions

View File

@@ -20,7 +20,7 @@ use beacon_chain::{
observed_operations::ObservationOutcome,
validator_monitor::{get_block_delay_ms, timestamp_now},
AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
ExecutionLayerStatus, WhenSlotSkipped,
HeadSafetyStatus, WhenSlotSkipped,
};
use block_id::BlockId;
use eth2::types::{self as api_types, EndpointVersion, ValidatorId};
@@ -385,24 +385,31 @@ pub fn serve<T: BeaconChainTypes>(
)
.untuple_one();
// Create a `warp` filter that rejects requests unless the execution layer (EL) is ready.
let only_while_el_is_ready = warp::any()
// Create a `warp` filter that rejects requests unless the head has been verified by the
// execution layer.
let only_with_safe_head = warp::any()
.and(chain_filter.clone())
.and_then(move |chain: Arc<BeaconChain<T>>| async move {
let status = chain.execution_layer_status().await.map_err(|e| {
let status = chain.head_safety_status().map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"failed to read execution engine status: {:?}",
"failed to read head safety status: {:?}",
e
))
})?;
match status {
ExecutionLayerStatus::Ready | ExecutionLayerStatus::NotRequired => Ok(()),
ExecutionLayerStatus::NotReady => Err(warp_utils::reject::custom_server_error(
"execution engine(s) not ready".to_string(),
)),
ExecutionLayerStatus::Missing => Err(warp_utils::reject::custom_server_error(
"no execution engines configured".to_string(),
)),
HeadSafetyStatus::Safe(_) => Ok(()),
HeadSafetyStatus::Unsafe(hash) => {
Err(warp_utils::reject::custom_server_error(format!(
"optimistic head hash {:?} has not been verified by the execution layer",
hash
)))
}
HeadSafetyStatus::Invalid(hash) => {
Err(warp_utils::reject::custom_server_error(format!(
"the head block has an invalid payload {:?}, this may be unrecoverable",
hash
)))
}
}
})
.untuple_one();
@@ -1103,7 +1110,6 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::body::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and(only_while_el_is_ready.clone())
.and_then(
|chain: Arc<BeaconChain<T>>,
attestations: Vec<Attestation<T::EthSpec>>,
@@ -1401,7 +1407,6 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::body::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and(only_while_el_is_ready.clone())
.and_then(
|chain: Arc<BeaconChain<T>>,
signatures: Vec<SyncCommitteeMessage>,
@@ -1831,7 +1836,6 @@ pub fn serve<T: BeaconChainTypes>(
}))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(only_while_el_is_ready.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and_then(|epoch: Epoch, chain: Arc<BeaconChain<T>>, log: Logger| {
@@ -1849,7 +1853,6 @@ pub fn serve<T: BeaconChainTypes>(
}))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(only_while_el_is_ready.clone())
.and(warp::query::<api_types::ValidatorBlocksQuery>())
.and(chain_filter.clone())
.and_then(
@@ -1884,7 +1887,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(warp::query::<api_types::ValidatorAttestationDataQuery>())
.and(not_while_syncing_filter.clone())
.and(only_while_el_is_ready.clone())
.and(only_with_safe_head.clone())
.and(chain_filter.clone())
.and_then(
|query: api_types::ValidatorAttestationDataQuery, chain: Arc<BeaconChain<T>>| {
@@ -1917,7 +1920,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(warp::query::<api_types::ValidatorAggregateAttestationQuery>())
.and(not_while_syncing_filter.clone())
.and(only_while_el_is_ready.clone())
.and(only_with_safe_head.clone())
.and(chain_filter.clone())
.and_then(
|query: api_types::ValidatorAggregateAttestationQuery, chain: Arc<BeaconChain<T>>| {
@@ -1949,7 +1952,6 @@ pub fn serve<T: BeaconChainTypes>(
}))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(only_while_el_is_ready.clone())
.and(warp::body::json())
.and(chain_filter.clone())
.and_then(
@@ -1972,7 +1974,6 @@ pub fn serve<T: BeaconChainTypes>(
}))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(only_while_el_is_ready.clone())
.and(warp::body::json())
.and(chain_filter.clone())
.and_then(
@@ -1990,7 +1991,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(warp::query::<SyncContributionData>())
.and(not_while_syncing_filter.clone())
.and(only_while_el_is_ready.clone())
.and(only_with_safe_head)
.and(chain_filter.clone())
.and_then(
|sync_committee_data: SyncContributionData, chain: Arc<BeaconChain<T>>| {
@@ -2013,7 +2014,6 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("aggregate_and_proofs"))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(only_while_el_is_ready.clone())
.and(chain_filter.clone())
.and(warp::body::json())
.and(network_tx_filter.clone())
@@ -2114,7 +2114,6 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("contribution_and_proofs"))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(only_while_el_is_ready)
.and(chain_filter.clone())
.and(warp::body::json())
.and(network_tx_filter.clone())