mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-20 06:18:31 +00:00
Don't panic in forkchoiceUpdated handler (#3165)
## Issue Addressed Fix a panic due to misuse of the Tokio executor when processing a forkchoiceUpdated response. We were previously calling `process_invalid_execution_payload` from the async function `update_execution_engine_forkchoice_async`, which resulted in a panic because `process_invalid_execution_payload` contains a call to fork choice, which ultimately calls `block_on`. An example backtrace can be found here: https://gist.github.com/michaelsproul/ac5da03e203d6ffac672423eaf52fb20 ## Proposed Changes Wrap the call to `process_invalid_execution_payload` in a `spawn_blocking` so that `block_on` is no longer called from an async context. ## Additional Info - I've been thinking about how to catch bugs like this with static analysis (a new Clippy lint). - The payload validation tests have been re-worked to support distinct responses from the mock EE for newPayload and forkchoiceUpdated. Three new tests have been added covering the `Invalid`, `InvalidBlockHash` and `InvalidTerminalBlock` cases. - I think we need a bunch more tests of different legal and illegal variations
This commit is contained in:
@@ -21,13 +21,15 @@ const VALIDATOR_COUNT: usize = 32;
|
||||
|
||||
type E = MainnetEthSpec;
|
||||
|
||||
#[derive(PartialEq, Clone)]
|
||||
#[derive(PartialEq, Clone, Copy)]
|
||||
enum Payload {
|
||||
Valid,
|
||||
Invalid {
|
||||
latest_valid_hash: Option<ExecutionBlockHash>,
|
||||
},
|
||||
Syncing,
|
||||
InvalidBlockHash,
|
||||
InvalidTerminalBlock,
|
||||
}
|
||||
|
||||
struct InvalidPayloadRig {
|
||||
@@ -154,8 +156,9 @@ impl InvalidPayloadRig {
|
||||
assert_eq!(justified_checkpoint.epoch, 2);
|
||||
}
|
||||
|
||||
/// Import a block while setting the newPayload and forkchoiceUpdated responses to `is_valid`.
|
||||
fn import_block(&mut self, is_valid: Payload) -> Hash256 {
|
||||
self.import_block_parametric(is_valid, |error| {
|
||||
self.import_block_parametric(is_valid, is_valid, |error| {
|
||||
matches!(
|
||||
error,
|
||||
BlockError::ExecutionPayloadError(
|
||||
@@ -183,7 +186,8 @@ impl InvalidPayloadRig {
|
||||
|
||||
fn import_block_parametric<F: Fn(&BlockError<E>) -> bool>(
|
||||
&mut self,
|
||||
is_valid: Payload,
|
||||
new_payload_response: Payload,
|
||||
forkchoice_response: Payload,
|
||||
evaluate_error: F,
|
||||
) -> Hash256 {
|
||||
let mock_execution_layer = self.harness.mock_execution_layer.as_ref().unwrap();
|
||||
@@ -194,15 +198,54 @@ impl InvalidPayloadRig {
|
||||
let (block, post_state) = self.harness.make_block(state, slot);
|
||||
let block_root = block.canonical_root();
|
||||
|
||||
match is_valid {
|
||||
Payload::Valid | Payload::Syncing => {
|
||||
if is_valid == Payload::Syncing {
|
||||
// Importing a payload whilst returning `SYNCING` simulates an EE that obtains
|
||||
// the block via it's own means (e.g., devp2p).
|
||||
let should_import_payload = true;
|
||||
mock_execution_layer
|
||||
.server
|
||||
.all_payloads_syncing(should_import_payload);
|
||||
let set_new_payload = |payload: Payload| match payload {
|
||||
Payload::Valid => mock_execution_layer
|
||||
.server
|
||||
.all_payloads_valid_on_new_payload(),
|
||||
Payload::Syncing => mock_execution_layer
|
||||
.server
|
||||
.all_payloads_syncing_on_new_payload(true),
|
||||
Payload::Invalid { latest_valid_hash } => {
|
||||
let latest_valid_hash = latest_valid_hash
|
||||
.unwrap_or_else(|| self.block_hash(block.message().parent_root()));
|
||||
mock_execution_layer
|
||||
.server
|
||||
.all_payloads_invalid_on_new_payload(latest_valid_hash)
|
||||
}
|
||||
Payload::InvalidBlockHash => mock_execution_layer
|
||||
.server
|
||||
.all_payloads_invalid_block_hash_on_new_payload(),
|
||||
Payload::InvalidTerminalBlock => mock_execution_layer
|
||||
.server
|
||||
.all_payloads_invalid_terminal_block_on_new_payload(),
|
||||
};
|
||||
let set_forkchoice_updated = |payload: Payload| match payload {
|
||||
Payload::Valid => mock_execution_layer
|
||||
.server
|
||||
.all_payloads_valid_on_forkchoice_updated(),
|
||||
Payload::Syncing => mock_execution_layer
|
||||
.server
|
||||
.all_payloads_syncing_on_forkchoice_updated(),
|
||||
Payload::Invalid { latest_valid_hash } => {
|
||||
let latest_valid_hash = latest_valid_hash
|
||||
.unwrap_or_else(|| self.block_hash(block.message().parent_root()));
|
||||
mock_execution_layer
|
||||
.server
|
||||
.all_payloads_invalid_on_forkchoice_updated(latest_valid_hash)
|
||||
}
|
||||
Payload::InvalidBlockHash => mock_execution_layer
|
||||
.server
|
||||
.all_payloads_invalid_block_hash_on_forkchoice_updated(),
|
||||
Payload::InvalidTerminalBlock => mock_execution_layer
|
||||
.server
|
||||
.all_payloads_invalid_terminal_block_on_forkchoice_updated(),
|
||||
};
|
||||
|
||||
match (new_payload_response, forkchoice_response) {
|
||||
(Payload::Valid | Payload::Syncing, Payload::Valid | Payload::Syncing) => {
|
||||
if new_payload_response == Payload::Syncing {
|
||||
set_new_payload(new_payload_response);
|
||||
set_forkchoice_updated(forkchoice_response);
|
||||
} else {
|
||||
mock_execution_layer.server.full_payload_verification();
|
||||
}
|
||||
@@ -221,10 +264,12 @@ impl InvalidPayloadRig {
|
||||
|
||||
let execution_status = self.execution_status(root.into());
|
||||
|
||||
match is_valid {
|
||||
match forkchoice_response {
|
||||
Payload::Syncing => assert!(execution_status.is_optimistic()),
|
||||
Payload::Valid => assert!(execution_status.is_valid_and_post_bellatrix()),
|
||||
Payload::Invalid { .. } => unreachable!(),
|
||||
Payload::Invalid { .. }
|
||||
| Payload::InvalidBlockHash
|
||||
| Payload::InvalidTerminalBlock => unreachable!(),
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
@@ -233,35 +278,46 @@ impl InvalidPayloadRig {
|
||||
"block from db must match block imported"
|
||||
);
|
||||
}
|
||||
Payload::Invalid { latest_valid_hash } => {
|
||||
let latest_valid_hash = latest_valid_hash
|
||||
.unwrap_or_else(|| self.block_hash(block.message().parent_root()));
|
||||
|
||||
mock_execution_layer
|
||||
.server
|
||||
.all_payloads_invalid(latest_valid_hash);
|
||||
(
|
||||
Payload::Invalid { .. } | Payload::InvalidBlockHash | Payload::InvalidTerminalBlock,
|
||||
_,
|
||||
)
|
||||
| (
|
||||
_,
|
||||
Payload::Invalid { .. } | Payload::InvalidBlockHash | Payload::InvalidTerminalBlock,
|
||||
) => {
|
||||
set_new_payload(new_payload_response);
|
||||
set_forkchoice_updated(forkchoice_response);
|
||||
|
||||
match self.harness.process_block(slot, block) {
|
||||
Err(error) if evaluate_error(&error) => (),
|
||||
Err(other) => {
|
||||
panic!("evaluate_error returned false with {:?}", other)
|
||||
}
|
||||
Ok(_) => panic!("block with invalid payload was imported"),
|
||||
Ok(_) => {
|
||||
// An invalid payload should only be imported initially if its status when
|
||||
// initially supplied to the EE is Valid or Syncing.
|
||||
assert!(matches!(
|
||||
new_payload_response,
|
||||
Payload::Valid | Payload::Syncing
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
assert!(
|
||||
self.harness
|
||||
.chain
|
||||
.fork_choice
|
||||
.read()
|
||||
.get_block(&block_root)
|
||||
.is_none(),
|
||||
"invalid block must not exist in fork choice"
|
||||
);
|
||||
assert!(
|
||||
self.harness.chain.get_block(&block_root).unwrap().is_none(),
|
||||
"invalid block cannot be accessed via get_block"
|
||||
);
|
||||
let block_in_forkchoice =
|
||||
self.harness.chain.fork_choice.read().get_block(&block_root);
|
||||
if let Payload::Invalid { .. } = new_payload_response {
|
||||
// A block found to be immediately invalid should not end up in fork choice.
|
||||
assert_eq!(block_in_forkchoice, None);
|
||||
|
||||
assert!(
|
||||
self.harness.chain.get_block(&block_root).unwrap().is_none(),
|
||||
"invalid block cannot be accessed via get_block"
|
||||
);
|
||||
} else {
|
||||
// A block imported and then found invalid should have an invalid status.
|
||||
assert!(block_in_forkchoice.unwrap().execution_status.is_invalid());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -317,6 +373,48 @@ fn invalid_payload_invalidates_parent() {
|
||||
assert_eq!(rig.head_info().block_root, roots[0]);
|
||||
}
|
||||
|
||||
/// Test invalidation of a payload via the fork choice updated message.
|
||||
///
|
||||
/// The `invalid_payload` argument determines the type of invalid payload: `Invalid`,
|
||||
/// `InvalidBlockHash`, etc, taking the `latest_valid_hash` as an argument.
|
||||
fn immediate_forkchoice_update_invalid_test(
|
||||
invalid_payload: impl FnOnce(Option<ExecutionBlockHash>) -> Payload,
|
||||
) {
|
||||
let mut rig = InvalidPayloadRig::new().enable_attestations();
|
||||
rig.move_to_terminal_block();
|
||||
rig.import_block(Payload::Valid); // Import a valid transition block.
|
||||
rig.move_to_first_justification(Payload::Syncing);
|
||||
|
||||
let valid_head_root = rig.import_block(Payload::Valid);
|
||||
let latest_valid_hash = Some(rig.block_hash(valid_head_root));
|
||||
|
||||
// Import a block which returns syncing when supplied via newPayload, and then
|
||||
// invalid when the forkchoice update is sent.
|
||||
rig.import_block_parametric(Payload::Syncing, invalid_payload(latest_valid_hash), |_| {
|
||||
false
|
||||
});
|
||||
|
||||
// The head should be the latest valid block.
|
||||
assert_eq!(rig.head_info().block_root, valid_head_root);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn immediate_forkchoice_update_payload_invalid() {
|
||||
immediate_forkchoice_update_invalid_test(|latest_valid_hash| Payload::Invalid {
|
||||
latest_valid_hash,
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn immediate_forkchoice_update_payload_invalid_block_hash() {
|
||||
immediate_forkchoice_update_invalid_test(|_| Payload::InvalidBlockHash)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn immediate_forkchoice_update_payload_invalid_terminal_block() {
|
||||
immediate_forkchoice_update_invalid_test(|_| Payload::InvalidTerminalBlock)
|
||||
}
|
||||
|
||||
/// Ensure the client tries to exit when the justified checkpoint is invalidated.
|
||||
#[test]
|
||||
fn justified_checkpoint_becomes_invalid() {
|
||||
@@ -339,19 +437,17 @@ fn justified_checkpoint_becomes_invalid() {
|
||||
assert!(rig.harness.shutdown_reasons().is_empty());
|
||||
|
||||
// Import a block that will invalidate the justified checkpoint.
|
||||
rig.import_block_parametric(
|
||||
Payload::Invalid {
|
||||
latest_valid_hash: Some(parent_hash_of_justified),
|
||||
},
|
||||
|error| {
|
||||
matches!(
|
||||
error,
|
||||
// The block import should fail since the beacon chain knows the justified payload
|
||||
// is invalid.
|
||||
BlockError::BeaconChainError(BeaconChainError::JustifiedPayloadInvalid { .. })
|
||||
)
|
||||
},
|
||||
);
|
||||
let is_valid = Payload::Invalid {
|
||||
latest_valid_hash: Some(parent_hash_of_justified),
|
||||
};
|
||||
rig.import_block_parametric(is_valid, is_valid, |error| {
|
||||
matches!(
|
||||
error,
|
||||
// The block import should fail since the beacon chain knows the justified payload
|
||||
// is invalid.
|
||||
BlockError::BeaconChainError(BeaconChainError::JustifiedPayloadInvalid { .. })
|
||||
)
|
||||
});
|
||||
|
||||
// The beacon chain should have triggered a shutdown.
|
||||
assert_eq!(
|
||||
|
||||
@@ -2011,26 +2011,28 @@ fn weak_subjectivity_sync() {
|
||||
let seconds_per_slot = spec.seconds_per_slot;
|
||||
|
||||
// Initialise a new beacon chain from the finalized checkpoint
|
||||
let beacon_chain = BeaconChainBuilder::new(MinimalEthSpec)
|
||||
.store(store.clone())
|
||||
.custom_spec(test_spec::<E>())
|
||||
.weak_subjectivity_state(wss_state, wss_block.clone(), genesis_state)
|
||||
.unwrap()
|
||||
.logger(log.clone())
|
||||
.store_migrator_config(MigratorConfig::default().blocking())
|
||||
.dummy_eth1_backend()
|
||||
.expect("should build dummy backend")
|
||||
.testing_slot_clock(Duration::from_secs(seconds_per_slot))
|
||||
.expect("should configure testing slot clock")
|
||||
.shutdown_sender(shutdown_tx)
|
||||
.chain_config(ChainConfig::default())
|
||||
.event_handler(Some(ServerSentEventHandler::new_with_capacity(
|
||||
log.clone(),
|
||||
1,
|
||||
)))
|
||||
.monitor_validators(true, vec![], log)
|
||||
.build()
|
||||
.expect("should build");
|
||||
let beacon_chain = Arc::new(
|
||||
BeaconChainBuilder::new(MinimalEthSpec)
|
||||
.store(store.clone())
|
||||
.custom_spec(test_spec::<E>())
|
||||
.weak_subjectivity_state(wss_state, wss_block.clone(), genesis_state)
|
||||
.unwrap()
|
||||
.logger(log.clone())
|
||||
.store_migrator_config(MigratorConfig::default().blocking())
|
||||
.dummy_eth1_backend()
|
||||
.expect("should build dummy backend")
|
||||
.testing_slot_clock(Duration::from_secs(seconds_per_slot))
|
||||
.expect("should configure testing slot clock")
|
||||
.shutdown_sender(shutdown_tx)
|
||||
.chain_config(ChainConfig::default())
|
||||
.event_handler(Some(ServerSentEventHandler::new_with_capacity(
|
||||
log.clone(),
|
||||
1,
|
||||
)))
|
||||
.monitor_validators(true, vec![], log)
|
||||
.build()
|
||||
.expect("should build"),
|
||||
);
|
||||
|
||||
// Apply blocks forward to reach head.
|
||||
let chain_dump = harness.chain.chain_dump().unwrap();
|
||||
|
||||
Reference in New Issue
Block a user