mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-19 12:56:12 +00:00
Don't return errors when fork choice fails (#3370)
## Issue Addressed
NA
## Proposed Changes
There are scenarios where the only viable head will have an invalid execution payload, in this scenario the `get_head` function on `proto_array` will return an error. We must recover from this scenario by importing blocks from the network.
This PR stops `BeaconChain::recompute_head` from returning an error so that we can't accidentally start down-scoring peers or aborting block import just because the current head has an invalid payload.
## Reviewer Notes
The following changes are included:
1. Allow `fork_choice.get_head` to fail gracefully in `BeaconChain::process_block` when trying to update the `early_attester_cache`; simply don't add the block to the cache rather than aborting the entire process.
1. Don't return an error from `BeaconChain::recompute_head_at_current_slot` and `BeaconChain::recompute_head` to defensively prevent calling functions from aborting any process just because the fork choice function failed to run.
- This should have practically no effect, since most callers were still continuing if recomputing the head failed.
- The outlier is that the API will return 200 rather than a 500 when fork choice fails.
1. Add the `ProtoArrayForkChoice::set_all_blocks_to_optimistic` function to recover from the scenario where we've rebooted and the persisted fork choice has an invalid head.
This commit is contained in:
@@ -2805,32 +2805,38 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
if !payload_verification_status.is_optimistic()
|
||||
&& block.slot() + EARLY_ATTESTER_CACHE_HISTORIC_SLOTS >= current_slot
|
||||
{
|
||||
let new_head_root = fork_choice
|
||||
.get_head(current_slot, &self.spec)
|
||||
.map_err(BeaconChainError::from)?;
|
||||
|
||||
if new_head_root == block_root {
|
||||
if let Some(proto_block) = fork_choice.get_block(&block_root) {
|
||||
if let Err(e) = self.early_attester_cache.add_head_block(
|
||||
block_root,
|
||||
signed_block.clone(),
|
||||
proto_block,
|
||||
&state,
|
||||
&self.spec,
|
||||
) {
|
||||
match fork_choice.get_head(current_slot, &self.spec) {
|
||||
// This block became the head, add it to the early attester cache.
|
||||
Ok(new_head_root) if new_head_root == block_root => {
|
||||
if let Some(proto_block) = fork_choice.get_block(&block_root) {
|
||||
if let Err(e) = self.early_attester_cache.add_head_block(
|
||||
block_root,
|
||||
signed_block.clone(),
|
||||
proto_block,
|
||||
&state,
|
||||
&self.spec,
|
||||
) {
|
||||
warn!(
|
||||
self.log,
|
||||
"Early attester cache insert failed";
|
||||
"error" => ?e
|
||||
);
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
self.log,
|
||||
"Early attester cache insert failed";
|
||||
"error" => ?e
|
||||
"Early attester block missing";
|
||||
"block_root" => ?block_root
|
||||
);
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
self.log,
|
||||
"Early attester block missing";
|
||||
"block_root" => ?block_root
|
||||
);
|
||||
}
|
||||
// This block did not become the head, nothing to do.
|
||||
Ok(_) => (),
|
||||
Err(e) => error!(
|
||||
self.log,
|
||||
"Failed to compute head during block import";
|
||||
"error" => ?e
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3608,16 +3614,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
// Run fork choice since it's possible that the payload invalidation might result in a new
|
||||
// head.
|
||||
//
|
||||
// Don't return early though, since invalidating the justified checkpoint might cause an
|
||||
// error here.
|
||||
if let Err(e) = self.recompute_head_at_current_slot().await {
|
||||
crit!(
|
||||
self.log,
|
||||
"Failed to run fork choice routine";
|
||||
"error" => ?e,
|
||||
);
|
||||
}
|
||||
self.recompute_head_at_current_slot().await;
|
||||
|
||||
// Obtain the justified root from fork choice.
|
||||
//
|
||||
@@ -4262,14 +4259,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
|
||||
// Run fork choice and signal to any waiting task that it has completed.
|
||||
if let Err(e) = self.recompute_head_at_current_slot().await {
|
||||
error!(
|
||||
self.log,
|
||||
"Fork choice error at slot start";
|
||||
"error" => ?e,
|
||||
"slot" => slot,
|
||||
);
|
||||
}
|
||||
self.recompute_head_at_current_slot().await;
|
||||
|
||||
// Send the notification regardless of fork choice success, this is a "best effort"
|
||||
// notification and we don't want block production to hit the timeout in case of error.
|
||||
|
||||
@@ -434,9 +434,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// Execute the fork choice algorithm and enthrone the result as the canonical head.
|
||||
///
|
||||
/// This method replaces the old `BeaconChain::fork_choice` method.
|
||||
pub async fn recompute_head_at_current_slot(self: &Arc<Self>) -> Result<(), Error> {
|
||||
let current_slot = self.slot()?;
|
||||
self.recompute_head_at_slot(current_slot).await
|
||||
pub async fn recompute_head_at_current_slot(self: &Arc<Self>) {
|
||||
match self.slot() {
|
||||
Ok(current_slot) => self.recompute_head_at_slot(current_slot).await,
|
||||
Err(e) => error!(
|
||||
self.log,
|
||||
"No slot when recomputing head";
|
||||
"error" => ?e
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the fork choice algorithm and enthrone the result as the canonical head.
|
||||
@@ -445,7 +451,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// different slot to the wall-clock can be useful for pushing fork choice into the next slot
|
||||
/// *just* before the start of the slot. This ensures that block production can use the correct
|
||||
/// head value without being delayed.
|
||||
pub async fn recompute_head_at_slot(self: &Arc<Self>, current_slot: Slot) -> Result<(), Error> {
|
||||
///
|
||||
/// This function purposefully does *not* return a `Result`. It's possible for fork choice to
|
||||
/// fail to update if there is only one viable head and it has an invalid execution payload. In
|
||||
/// such a case it's critical that the `BeaconChain` keeps importing blocks so that the
|
||||
/// situation can be rectified. We avoid returning an error here so that calling functions
|
||||
/// can't abort block import because an error is returned here.
|
||||
pub async fn recompute_head_at_slot(self: &Arc<Self>, current_slot: Slot) {
|
||||
metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS);
|
||||
let _timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES);
|
||||
|
||||
@@ -455,15 +467,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
move || chain.recompute_head_at_slot_internal(current_slot),
|
||||
"recompute_head_internal",
|
||||
)
|
||||
.await?
|
||||
.await
|
||||
{
|
||||
// Fork choice returned successfully and did not need to update the EL.
|
||||
Ok(None) => Ok(()),
|
||||
Ok(Ok(None)) => (),
|
||||
// Fork choice returned successfully and needed to update the EL. It has returned a
|
||||
// join-handle from when it spawned some async tasks. We should await those tasks.
|
||||
Ok(Some(join_handle)) => match join_handle.await {
|
||||
Ok(Ok(Some(join_handle))) => match join_handle.await {
|
||||
// The async task completed successfully.
|
||||
Ok(Some(())) => Ok(()),
|
||||
Ok(Some(())) => (),
|
||||
// The async task did not complete successfully since the runtime is shutting down.
|
||||
Ok(None) => {
|
||||
debug!(
|
||||
@@ -471,7 +483,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
"Did not update EL fork choice";
|
||||
"info" => "shutting down"
|
||||
);
|
||||
Err(Error::RuntimeShutdown)
|
||||
}
|
||||
// The async task did not complete successfully, tokio returned an error.
|
||||
Err(e) => {
|
||||
@@ -480,13 +491,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
"Did not update EL fork choice";
|
||||
"error" => ?e
|
||||
);
|
||||
Err(Error::TokioJoin(e))
|
||||
}
|
||||
},
|
||||
// There was an error recomputing the head.
|
||||
Err(e) => {
|
||||
Ok(Err(e)) => {
|
||||
metrics::inc_counter(&metrics::FORK_CHOICE_ERRORS);
|
||||
Err(e)
|
||||
error!(
|
||||
self.log,
|
||||
"Error whist recomputing head";
|
||||
"error" => ?e
|
||||
);
|
||||
}
|
||||
// There was an error spawning the task.
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.log,
|
||||
"Failed to spawn recompute head task";
|
||||
"error" => ?e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,14 +220,7 @@ async fn state_advance_timer<T: BeaconChainTypes>(
|
||||
return;
|
||||
}
|
||||
|
||||
if let Err(e) = beacon_chain.recompute_head_at_slot(next_slot).await {
|
||||
warn!(
|
||||
log,
|
||||
"Error updating fork choice for next slot";
|
||||
"error" => ?e,
|
||||
"slot" => next_slot,
|
||||
);
|
||||
}
|
||||
beacon_chain.recompute_head_at_slot(next_slot).await;
|
||||
|
||||
// Use a blocking task to avoid blocking the core executor whilst waiting for locks
|
||||
// in `ForkChoiceSignalTx`.
|
||||
|
||||
@@ -211,6 +211,20 @@ impl<E: EthSpec> Builder<EphemeralHarnessType<E>> {
|
||||
self.store = Some(store);
|
||||
self.store_mutator(Box::new(mutator))
|
||||
}
|
||||
|
||||
/// Manually restore from a given `MemoryStore`.
|
||||
pub fn resumed_ephemeral_store(
|
||||
mut self,
|
||||
store: Arc<HotColdDB<E, MemoryStore<E>, MemoryStore<E>>>,
|
||||
) -> Self {
|
||||
let mutator = move |builder: BeaconChainBuilder<_>| {
|
||||
builder
|
||||
.resume_from_db()
|
||||
.expect("should resume from database")
|
||||
};
|
||||
self.store = Some(store);
|
||||
self.store_mutator(Box::new(mutator))
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> Builder<DiskHarnessType<E>> {
|
||||
@@ -1376,7 +1390,7 @@ where
|
||||
.process_block(Arc::new(block), CountUnrealized::True)
|
||||
.await?
|
||||
.into();
|
||||
self.chain.recompute_head_at_current_slot().await?;
|
||||
self.chain.recompute_head_at_current_slot().await;
|
||||
Ok(block_hash)
|
||||
}
|
||||
|
||||
@@ -1389,7 +1403,7 @@ where
|
||||
.process_block(Arc::new(block), CountUnrealized::True)
|
||||
.await?
|
||||
.into();
|
||||
self.chain.recompute_head_at_current_slot().await?;
|
||||
self.chain.recompute_head_at_current_slot().await;
|
||||
Ok(block_hash)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user