diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a9e26e4875..2e944f2939 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2805,32 +2805,38 @@ impl BeaconChain { if !payload_verification_status.is_optimistic() && block.slot() + EARLY_ATTESTER_CACHE_HISTORIC_SLOTS >= current_slot { - let new_head_root = fork_choice - .get_head(current_slot, &self.spec) - .map_err(BeaconChainError::from)?; - - if new_head_root == block_root { - if let Some(proto_block) = fork_choice.get_block(&block_root) { - if let Err(e) = self.early_attester_cache.add_head_block( - block_root, - signed_block.clone(), - proto_block, - &state, - &self.spec, - ) { + match fork_choice.get_head(current_slot, &self.spec) { + // This block became the head, add it to the early attester cache. + Ok(new_head_root) if new_head_root == block_root => { + if let Some(proto_block) = fork_choice.get_block(&block_root) { + if let Err(e) = self.early_attester_cache.add_head_block( + block_root, + signed_block.clone(), + proto_block, + &state, + &self.spec, + ) { + warn!( + self.log, + "Early attester cache insert failed"; + "error" => ?e + ); + } + } else { warn!( self.log, - "Early attester cache insert failed"; - "error" => ?e + "Early attester block missing"; + "block_root" => ?block_root ); } - } else { - warn!( - self.log, - "Early attester block missing"; - "block_root" => ?block_root - ); } + // This block did not become the head, nothing to do. + Ok(_) => (), + Err(e) => error!( + self.log, + "Failed to compute head during block import"; + "error" => ?e + ), } } @@ -3608,16 +3614,7 @@ impl BeaconChain { // Run fork choice since it's possible that the payload invalidation might result in a new // head. - // - // Don't return early though, since invalidating the justified checkpoint might cause an - // error here. - if let Err(e) = self.recompute_head_at_current_slot().await { - crit!( - self.log, - "Failed to run fork choice routine"; - "error" => ?e, - ); - } + self.recompute_head_at_current_slot().await; // Obtain the justified root from fork choice. // @@ -4262,14 +4259,7 @@ impl BeaconChain { } // Run fork choice and signal to any waiting task that it has completed. - if let Err(e) = self.recompute_head_at_current_slot().await { - error!( - self.log, - "Fork choice error at slot start"; - "error" => ?e, - "slot" => slot, - ); - } + self.recompute_head_at_current_slot().await; // Send the notification regardless of fork choice success, this is a "best effort" // notification and we don't want block production to hit the timeout in case of error. diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index aff4deeaf9..c37f266824 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -434,9 +434,15 @@ impl BeaconChain { /// Execute the fork choice algorithm and enthrone the result as the canonical head. /// /// This method replaces the old `BeaconChain::fork_choice` method. - pub async fn recompute_head_at_current_slot(self: &Arc) -> Result<(), Error> { - let current_slot = self.slot()?; - self.recompute_head_at_slot(current_slot).await + pub async fn recompute_head_at_current_slot(self: &Arc) { + match self.slot() { + Ok(current_slot) => self.recompute_head_at_slot(current_slot).await, + Err(e) => error!( + self.log, + "No slot when recomputing head"; + "error" => ?e + ), + } } /// Execute the fork choice algorithm and enthrone the result as the canonical head. @@ -445,7 +451,13 @@ impl BeaconChain { /// different slot to the wall-clock can be useful for pushing fork choice into the next slot /// *just* before the start of the slot. This ensures that block production can use the correct /// head value without being delayed. - pub async fn recompute_head_at_slot(self: &Arc, current_slot: Slot) -> Result<(), Error> { + /// + /// This function purposefully does *not* return a `Result`. It's possible for fork choice to + /// fail to update if there is only one viable head and it has an invalid execution payload. In + /// such a case it's critical that the `BeaconChain` keeps importing blocks so that the + /// situation can be rectified. We avoid returning an error here so that calling functions + /// can't abort block import because an error is returned here. + pub async fn recompute_head_at_slot(self: &Arc, current_slot: Slot) { metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS); let _timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES); @@ -455,15 +467,15 @@ impl BeaconChain { move || chain.recompute_head_at_slot_internal(current_slot), "recompute_head_internal", ) - .await? + .await { // Fork choice returned successfully and did not need to update the EL. - Ok(None) => Ok(()), + Ok(Ok(None)) => (), // Fork choice returned successfully and needed to update the EL. It has returned a // join-handle from when it spawned some async tasks. We should await those tasks. - Ok(Some(join_handle)) => match join_handle.await { + Ok(Ok(Some(join_handle))) => match join_handle.await { // The async task completed successfully. - Ok(Some(())) => Ok(()), + Ok(Some(())) => (), // The async task did not complete successfully since the runtime is shutting down. Ok(None) => { debug!( @@ -471,7 +483,6 @@ impl BeaconChain { "Did not update EL fork choice"; "info" => "shutting down" ); - Err(Error::RuntimeShutdown) } // The async task did not complete successfully, tokio returned an error. Err(e) => { @@ -480,13 +491,24 @@ impl BeaconChain { "Did not update EL fork choice"; "error" => ?e ); - Err(Error::TokioJoin(e)) } }, // There was an error recomputing the head. - Err(e) => { + Ok(Err(e)) => { metrics::inc_counter(&metrics::FORK_CHOICE_ERRORS); - Err(e) + error!( + self.log, + "Error whist recomputing head"; + "error" => ?e + ); + } + // There was an error spawning the task. + Err(e) => { + error!( + self.log, + "Failed to spawn recompute head task"; + "error" => ?e + ); } } } diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 5abec98877..48c0f2f8a2 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -220,14 +220,7 @@ async fn state_advance_timer( return; } - if let Err(e) = beacon_chain.recompute_head_at_slot(next_slot).await { - warn!( - log, - "Error updating fork choice for next slot"; - "error" => ?e, - "slot" => next_slot, - ); - } + beacon_chain.recompute_head_at_slot(next_slot).await; // Use a blocking task to avoid blocking the core executor whilst waiting for locks // in `ForkChoiceSignalTx`. diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 1297e7d78b..1f19465c08 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -211,6 +211,20 @@ impl Builder> { self.store = Some(store); self.store_mutator(Box::new(mutator)) } + + /// Manually restore from a given `MemoryStore`. + pub fn resumed_ephemeral_store( + mut self, + store: Arc, MemoryStore>>, + ) -> Self { + let mutator = move |builder: BeaconChainBuilder<_>| { + builder + .resume_from_db() + .expect("should resume from database") + }; + self.store = Some(store); + self.store_mutator(Box::new(mutator)) + } } impl Builder> { @@ -1376,7 +1390,7 @@ where .process_block(Arc::new(block), CountUnrealized::True) .await? .into(); - self.chain.recompute_head_at_current_slot().await?; + self.chain.recompute_head_at_current_slot().await; Ok(block_hash) } @@ -1389,7 +1403,7 @@ where .process_block(Arc::new(block), CountUnrealized::True) .await? .into(); - self.chain.recompute_head_at_current_slot().await?; + self.chain.recompute_head_at_current_slot().await; Ok(block_hash) } diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 43dda7ab05..88d6914036 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -160,11 +160,7 @@ async fn chain_segment_full_segment() { .into_block_error() .expect("should import chain segment"); - harness - .chain - .recompute_head_at_current_slot() - .await - .expect("should run fork choice"); + harness.chain.recompute_head_at_current_slot().await; assert_eq!( harness.head_block_root(), @@ -194,11 +190,7 @@ async fn chain_segment_varying_chunk_size() { .unwrap_or_else(|_| panic!("should import chain segment of len {}", chunk_size)); } - harness - .chain - .recompute_head_at_current_slot() - .await - .expect("should run fork choice"); + harness.chain.recompute_head_at_current_slot().await; assert_eq!( harness.head_block_root(), @@ -729,11 +721,7 @@ async fn block_gossip_verification() { } // Recompute the head to ensure we cache the latest view of fork choice. - harness - .chain - .recompute_head_at_current_slot() - .await - .unwrap(); + harness.chain.recompute_head_at_current_slot().await; /* * This test ensures that: diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index f2ebb430d4..4107631378 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -1,6 +1,7 @@ #![cfg(not(debug_assertions))] use beacon_chain::{ + canonical_head::{CachedHead, CanonicalHead}, test_utils::{BeaconChainHarness, EphemeralHarnessType}, BeaconChainError, BlockError, ExecutionPayloadError, StateSkipConfig, WhenSlotSkipped, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, @@ -14,6 +15,7 @@ use fork_choice::{ }; use proto_array::{Error as ProtoArrayError, ExecutionStatus}; use slot_clock::SlotClock; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use task_executor::ShutdownReason; @@ -95,11 +97,15 @@ impl InvalidPayloadRig { } async fn recompute_head(&self) { - self.harness - .chain - .recompute_head_at_current_slot() - .await - .unwrap(); + self.harness.chain.recompute_head_at_current_slot().await; + } + + fn cached_head(&self) -> CachedHead { + self.harness.chain.canonical_head.cached_head() + } + + fn canonical_head(&self) -> &CanonicalHead> { + &self.harness.chain.canonical_head } fn previous_forkchoice_update_params(&self) -> (ForkChoiceState, PayloadAttributes) { @@ -354,6 +360,19 @@ impl InvalidPayloadRig { .await .unwrap(); } + + fn assert_get_head_error_contains(&self, s: &str) { + match self + .harness + .chain + .canonical_head + .fork_choice_write_lock() + .get_head(self.harness.chain.slot().unwrap(), &self.harness.chain.spec) + { + Err(ForkChoiceError::ProtoArrayError(e)) if e.contains(s) => (), + other => panic!("expected {} error, got {:?}", s, other), + }; + } } /// Simple test of the different import types. @@ -1183,3 +1202,235 @@ async fn attesting_to_optimistic_head() { get_aggregated().unwrap(); get_aggregated_by_slot_and_root().unwrap(); } + +/// Helper for running tests where we generate a chain with an invalid head and then some +/// `fork_blocks` to recover it. +struct InvalidHeadSetup { + rig: InvalidPayloadRig, + fork_blocks: Vec>>, + invalid_head: CachedHead, +} + +impl InvalidHeadSetup { + async fn new() -> InvalidHeadSetup { + let mut rig = InvalidPayloadRig::new().enable_attestations(); + rig.move_to_terminal_block(); + rig.import_block(Payload::Valid).await; // Import a valid transition block. + + // Import blocks until the first time the chain finalizes. + while rig.cached_head().finalized_checkpoint().epoch == 0 { + rig.import_block(Payload::Syncing).await; + } + + let invalid_head = rig.cached_head(); + + // Invalidate the head block. + rig.invalidate_manually(invalid_head.head_block_root()) + .await; + assert!(rig + .canonical_head() + .head_execution_status() + .unwrap() + .is_invalid()); + + // Finding a new head should fail since the only possible head is not valid. + rig.assert_get_head_error_contains("InvalidBestNode"); + + // Build three "fork" blocks that conflict with the current canonical head. Don't apply them to + // the chain yet. + let mut fork_blocks = vec![]; + let mut parent_state = rig + .harness + .chain + .state_at_slot( + invalid_head.head_slot() - 3, + StateSkipConfig::WithStateRoots, + ) + .unwrap(); + for _ in 0..3 { + let slot = parent_state.slot() + 1; + let (fork_block, post_state) = rig.harness.make_block(parent_state, slot).await; + parent_state = post_state; + fork_blocks.push(Arc::new(fork_block)) + } + + Self { + rig, + fork_blocks, + invalid_head, + } + } +} + +#[tokio::test] +async fn recover_from_invalid_head_by_importing_blocks() { + let InvalidHeadSetup { + rig, + fork_blocks, + invalid_head, + } = InvalidHeadSetup::new().await; + + // Import the first two blocks, they should not become the head. + for i in 0..2 { + if i == 0 { + // The first block should be `VALID` during import. + rig.harness + .mock_execution_layer + .as_ref() + .unwrap() + .server + .all_payloads_valid_on_new_payload(); + } else { + // All blocks after the first block should return `SYNCING`. + rig.harness + .mock_execution_layer + .as_ref() + .unwrap() + .server + .all_payloads_syncing_on_new_payload(true); + } + + rig.harness + .chain + .process_block(fork_blocks[i].clone(), CountUnrealized::True) + .await + .unwrap(); + rig.recompute_head().await; + rig.assert_get_head_error_contains("InvalidBestNode"); + let new_head = rig.cached_head(); + assert_eq!( + new_head.head_block_root(), + invalid_head.head_block_root(), + "the head should not change" + ); + } + + // Import the third block, it should become the head. + rig.harness + .chain + .process_block(fork_blocks[2].clone(), CountUnrealized::True) + .await + .unwrap(); + rig.recompute_head().await; + let new_head = rig.cached_head(); + assert_eq!( + new_head.head_block_root(), + fork_blocks[2].canonical_root(), + "the third block should become the head" + ); + + let manual_get_head = rig + .harness + .chain + .canonical_head + .fork_choice_write_lock() + .get_head(rig.harness.chain.slot().unwrap(), &rig.harness.chain.spec) + .unwrap(); + assert_eq!(manual_get_head, new_head.head_block_root(),); +} + +#[tokio::test] +async fn recover_from_invalid_head_after_persist_and_reboot() { + let InvalidHeadSetup { + rig, + fork_blocks: _, + invalid_head, + } = InvalidHeadSetup::new().await; + + // Forcefully persist the head and fork choice. + rig.harness.chain.persist_head_and_fork_choice().unwrap(); + + let resumed = BeaconChainHarness::builder(MainnetEthSpec) + .default_spec() + .deterministic_keypairs(VALIDATOR_COUNT) + .resumed_ephemeral_store(rig.harness.chain.store.clone()) + .mock_execution_layer() + .build(); + + // Forget the original rig so we don't accidentally use it again. + drop(rig); + + let resumed_head = resumed.chain.canonical_head.cached_head(); + assert_eq!( + resumed_head.head_block_root(), + invalid_head.head_block_root(), + "the resumed harness should have the invalid block as the head" + ); + assert!( + resumed + .chain + .canonical_head + .fork_choice_read_lock() + .is_optimistic_block(&resumed_head.head_block_root()) + .unwrap(), + "the invalid block should have become optimistic" + ); +} + +#[tokio::test] +async fn weights_after_resetting_optimistic_status() { + let mut rig = InvalidPayloadRig::new().enable_attestations(); + rig.move_to_terminal_block(); + rig.import_block(Payload::Valid).await; // Import a valid transition block. + + let mut roots = vec![]; + for _ in 0..4 { + roots.push(rig.import_block(Payload::Syncing).await); + } + + rig.recompute_head().await; + let head = rig.cached_head(); + + let original_weights = rig + .harness + .chain + .canonical_head + .fork_choice_read_lock() + .proto_array() + .iter_nodes(&head.head_block_root()) + .map(|node| (node.root, node.weight)) + .collect::>(); + + rig.invalidate_manually(roots[1]).await; + + rig.harness + .chain + .canonical_head + .fork_choice_write_lock() + .proto_array_mut() + .set_all_blocks_to_optimistic::(&rig.harness.chain.spec) + .unwrap(); + + let new_weights = rig + .harness + .chain + .canonical_head + .fork_choice_read_lock() + .proto_array() + .iter_nodes(&head.head_block_root()) + .map(|node| (node.root, node.weight)) + .collect::>(); + + assert_eq!(original_weights, new_weights); + + // Advance the current slot and run fork choice to remove proposer boost. + rig.harness + .set_current_slot(rig.harness.chain.slot().unwrap() + 1); + rig.recompute_head().await; + + assert_eq!( + rig.harness + .chain + .canonical_head + .fork_choice_read_lock() + .get_block_weight(&head.head_block_root()) + .unwrap(), + head.snapshot.beacon_state.validators()[0].effective_balance, + "proposer boost should be removed from the head block and the vote of a single validator applied" + ); + + // Import a length of chain to ensure the chain can be built atop. + for _ in 0..E::slots_per_epoch() * 4 { + rig.import_block(Payload::Valid).await; + } +} diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index b5b8152e8d..d9d5ca20d7 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2128,7 +2128,7 @@ async fn weak_subjectivity_sync() { .process_block(Arc::new(full_block), CountUnrealized::True) .await .unwrap(); - beacon_chain.recompute_head_at_current_slot().await.unwrap(); + beacon_chain.recompute_head_at_current_slot().await; // Check that the new block's state can be loaded correctly. let state_root = block.state_root(); @@ -2460,11 +2460,7 @@ async fn revert_minority_fork_on_resume() { .build(); // Head should now be just before the fork. - resumed_harness - .chain - .recompute_head_at_current_slot() - .await - .unwrap(); + resumed_harness.chain.recompute_head_at_current_slot().await; assert_eq!(resumed_harness.head_slot(), fork_slot - 1); // Head track should know the canonical head and the rogue head. @@ -2482,11 +2478,7 @@ async fn revert_minority_fork_on_resume() { .unwrap(); // The canonical head should be the block from the majority chain. - resumed_harness - .chain - .recompute_head_at_current_slot() - .await - .unwrap(); + resumed_harness.chain.recompute_head_at_current_slot().await; assert_eq!(resumed_harness.head_slot(), block.slot()); assert_eq!(resumed_harness.head_block_root(), block.canonical_root()); } diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 80a122976f..f7d443748d 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -693,11 +693,7 @@ async fn run_skip_slot_test(skip_slots: u64) { harness_a.chain.head_snapshot().beacon_block_root ); - harness_b - .chain - .recompute_head_at_current_slot() - .await - .expect("should run fork choice"); + harness_b.chain.recompute_head_at_current_slot().await; assert_eq!( harness_b.chain.head_snapshot().beacon_block.slot(), diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 3284f874f9..c2503f392f 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1059,10 +1059,7 @@ pub fn serve( // Update the head since it's likely this block will become the new // head. - chain - .recompute_head_at_current_slot() - .await - .map_err(warp_utils::reject::beacon_chain_error)?; + chain.recompute_head_at_current_slot().await; // Perform some logging to inform users if their blocks are being produced // late. @@ -1186,10 +1183,7 @@ pub fn serve( Ok(_) => { // Update the head since it's likely this block will become the new // head. - chain - .recompute_head_at_current_slot() - .await - .map_err(warp_utils::reject::beacon_chain_error)?; + chain.recompute_head_at_current_slot().await; Ok(warp::reply::json(&())) } diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs index d437cf0bed..05854ac1e2 100644 --- a/beacon_node/network/src/beacon_processor/tests.rs +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -221,7 +221,7 @@ impl TestRig { } pub async fn recompute_head(&self) { - self.chain.recompute_head_at_current_slot().await.unwrap() + self.chain.recompute_head_at_current_slot().await } pub fn head_root(&self) -> Hash256 { diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 1b1dc12d87..12172e0e53 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -927,21 +927,7 @@ impl Worker { "peer_id" => %peer_id ); - if let Err(e) = self.chain.recompute_head_at_current_slot().await { - error!( - self.log, - "Fork choice failed"; - "error" => ?e, - "location" => "block_gossip" - ) - } else { - debug!( - self.log, - "Fork choice success"; - "block" => ?block_root, - "location" => "block_gossip" - ) - } + self.chain.recompute_head_at_current_slot().await; } Err(BlockError::ParentUnknown { .. }) => { // Inform the sync manager to find parents for this block diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index ffcadb8689..a27ba7bfa0 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -111,7 +111,7 @@ impl Worker { None, ); - self.recompute_head("process_rpc_block").await; + self.chain.recompute_head_at_current_slot().await; } } // Sync handles these results @@ -248,7 +248,7 @@ impl Worker { ChainSegmentResult::Successful { imported_blocks } => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); if imported_blocks > 0 { - self.recompute_head("process_blocks_ok").await; + self.chain.recompute_head_at_current_slot().await; } (imported_blocks, Ok(())) } @@ -259,7 +259,7 @@ impl Worker { metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL); let r = self.handle_failed_chain_segment(error); if imported_blocks > 0 { - self.recompute_head("process_blocks_err").await; + self.chain.recompute_head_at_current_slot().await; } (imported_blocks, r) } @@ -392,24 +392,6 @@ impl Worker { } } - /// Runs fork-choice on a given chain. This is used during block processing after one successful - /// block import. - async fn recompute_head(&self, location: &str) { - match self.chain.recompute_head_at_current_slot().await { - Ok(()) => debug!( - self.log, - "Fork choice success"; - "location" => location - ), - Err(e) => error!( - self.log, - "Fork choice failed"; - "error" => ?e, - "location" => location - ), - } - } - /// Helper function to handle a `BlockError` from `process_chain_segment` fn handle_failed_chain_segment( &self, diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index a31d8ade6b..c17c46a777 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -1451,7 +1451,17 @@ where _phantom: PhantomData, }; - fork_choice.get_head(current_slot, spec)?; + // If a call to `get_head` fails, the only known cause is because the only head with viable + // FFG properties is has an invalid payload. In this scenario, set all the payloads back to + // an optimistic status so that we can have a head to start from. + if fork_choice.get_head(current_slot, spec).is_err() { + fork_choice + .proto_array + .set_all_blocks_to_optimistic::(spec)?; + // If the second attempt at finding a head fails, return an error since we do not + // expect this scenario. + fork_choice.get_head(current_slot, spec)?; + } Ok(fork_choice) } diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index 85a15fb60e..962408513e 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -980,7 +980,7 @@ impl ProtoArray { /// Returns `None` if there is an overflow or underflow when calculating the score. /// /// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/fork-choice.md#get_latest_attesting_balance -fn calculate_proposer_boost( +pub fn calculate_proposer_boost( validator_balances: &[u64], proposer_score_boost: u64, ) -> Option { diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 4767919f70..3ecdc68a2e 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -1,5 +1,7 @@ use crate::error::Error; -use crate::proto_array::{InvalidationOperation, Iter, ProposerBoost, ProtoArray, ProtoNode}; +use crate::proto_array::{ + calculate_proposer_boost, InvalidationOperation, Iter, ProposerBoost, ProtoArray, ProtoNode, +}; use crate::ssz_container::SszContainer; use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, Encode}; @@ -303,6 +305,106 @@ impl ProtoArrayForkChoice { .map_err(|e| format!("find_head failed: {:?}", e)) } + /// For all nodes, regardless of their relationship to the finalized block, set their execution + /// status to be optimistic. + /// + /// In practice this means forgetting any `VALID` or `INVALID` statuses. + pub fn set_all_blocks_to_optimistic( + &mut self, + spec: &ChainSpec, + ) -> Result<(), String> { + // Iterate backwards through all nodes in the `proto_array`. Whilst it's not strictly + // required to do this process in reverse, it seems natural when we consider how LMD votes + // are counted. + // + // This function will touch all blocks, even those that do not descend from the finalized + // block. Since this function is expected to run at start-up during very rare + // circumstances we prefer simplicity over efficiency. + for node_index in (0..self.proto_array.nodes.len()).rev() { + let node = self + .proto_array + .nodes + .get_mut(node_index) + .ok_or("unreachable index out of bounds in proto_array nodes")?; + + match node.execution_status { + ExecutionStatus::Invalid(block_hash) => { + node.execution_status = ExecutionStatus::Optimistic(block_hash); + + // Restore the weight of the node, it would have been set to `0` in + // `apply_score_changes` when it was invalidated. + let mut restored_weight: u64 = self + .votes + .0 + .iter() + .enumerate() + .filter_map(|(validator_index, vote)| { + if vote.current_root == node.root { + // Any voting validator that does not have a balance should be + // ignored. This is consistent with `compute_deltas`. + self.balances.get(validator_index) + } else { + None + } + }) + .sum(); + + // If the invalid root was boosted, apply the weight to it and + // ancestors. + if let Some(proposer_score_boost) = spec.proposer_score_boost { + if self.proto_array.previous_proposer_boost.root == node.root { + // Compute the score based upon the current balances. We can't rely on + // the `previous_proposr_boost.score` since it is set to zero with an + // invalid node. + let proposer_score = + calculate_proposer_boost::(&self.balances, proposer_score_boost) + .ok_or("Failed to compute proposer boost")?; + // Store the score we've applied here so it can be removed in + // a later call to `apply_score_changes`. + self.proto_array.previous_proposer_boost.score = proposer_score; + // Apply this boost to this node. + restored_weight = restored_weight + .checked_add(proposer_score) + .ok_or("Overflow when adding boost to weight")?; + } + } + + // Add the restored weight to the node and all ancestors. + if restored_weight > 0 { + let mut node_or_ancestor = node; + loop { + node_or_ancestor.weight = node_or_ancestor + .weight + .checked_add(restored_weight) + .ok_or("Overflow when adding weight to ancestor")?; + + if let Some(parent_index) = node_or_ancestor.parent { + node_or_ancestor = self + .proto_array + .nodes + .get_mut(parent_index) + .ok_or(format!("Missing parent index: {}", parent_index))?; + } else { + // This is either the finalized block or a block that does not + // descend from the finalized block. + break; + } + } + } + } + // There are no balance changes required if the node was either valid or + // optimistic. + ExecutionStatus::Valid(block_hash) | ExecutionStatus::Optimistic(block_hash) => { + node.execution_status = ExecutionStatus::Optimistic(block_hash) + } + // An irrelevant node cannot become optimistic, this is a no-op. + ExecutionStatus::Irrelevant(_) => (), + } + } + + Ok(()) + } + pub fn maybe_prune(&mut self, finalized_root: Hash256) -> Result<(), String> { self.proto_array .maybe_prune(finalized_root) diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index 7d90f2ee9a..65872efbe9 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -313,8 +313,7 @@ impl Tester { fn find_head(&self) -> Result, Error> { let chain = self.harness.chain.clone(); - self.block_on_dangerous(chain.recompute_head_at_current_slot())? - .map_err(|e| Error::InternalError(format!("failed to find head with {:?}", e)))?; + self.block_on_dangerous(chain.recompute_head_at_current_slot())?; Ok(self.harness.chain.canonical_head.cached_head()) }