diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 326d8b6c67..10506f3038 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -137,6 +137,9 @@ const MAX_PER_SLOT_FORK_CHOICE_DISTANCE: u64 = 4; pub const INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON: &str = "Justified block has an invalid execution payload."; +pub const INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON: &str = + "Finalized merge transition block is invalid."; + /// Defines the behaviour when a block/block-root for a skipped slot is requested. pub enum WhenSlotSkipped { /// If the slot is a skip slot, return `None`. @@ -528,6 +531,7 @@ impl BeaconChain { /// Even more efficient variant of `forwards_iter_block_roots` that will avoid cloning the head /// state if it isn't required for the requested range of blocks. + /// The range [start_slot, end_slot] is inclusive (ie `start_slot <= end_slot`) pub fn forwards_iter_block_roots_until( &self, start_slot: Slot, diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 0031bd2c6c..73330e7b56 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -44,7 +44,7 @@ //! ``` use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, - PayloadNotifier, + AllowOptimisticImport, PayloadNotifier, }; use crate::snapshot_cache::PreProcessingSnapshot; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; @@ -1199,7 +1199,7 @@ impl ExecutionPendingBlock { // - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no // calls to remote servers. if is_valid_merge_transition_block { - validate_merge_block(&chain, block.message()).await?; + validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?; }; // The specification declares that this should be run *inside* `per_block_processing`, diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index fade47e1d3..3c530aaac8 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -7,6 +7,7 @@ //! So, this module contains functions that one might expect to find in other crates, but they live //! here for good reason. +use crate::otb_verification_service::OptimisticTransitionBlock; use crate::{ BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, BlockProductionError, ExecutionPayloadError, @@ -27,6 +28,12 @@ use types::*; pub type PreparePayloadResult = Result; pub type PreparePayloadHandle = JoinHandle>>; +#[derive(PartialEq)] +pub enum AllowOptimisticImport { + Yes, + No, +} + /// Used to await the result of executing payload with a remote EE. pub struct PayloadNotifier { pub chain: Arc>, @@ -146,6 +153,7 @@ async fn notify_new_payload<'a, T: BeaconChainTypes>( pub async fn validate_merge_block<'a, T: BeaconChainTypes>( chain: &Arc>, block: BeaconBlockRef<'a, T::EthSpec>, + allow_optimistic_import: AllowOptimisticImport, ) -> Result<(), BlockError> { let spec = &chain.spec; let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); @@ -188,13 +196,18 @@ pub async fn validate_merge_block<'a, T: BeaconChainTypes>( } .into()), None => { - if is_optimistic_candidate_block(chain, block.slot(), block.parent_root()).await? { + if allow_optimistic_import == AllowOptimisticImport::Yes + && is_optimistic_candidate_block(chain, block.slot(), block.parent_root()).await? + { debug!( chain.log, - "Optimistically accepting terminal block"; + "Optimistically importing merge transition block"; "block_hash" => ?execution_payload.parent_hash(), "msg" => "the terminal block/parent was unavailable" ); + // Store Optimistic Transition Block in Database for later Verification + OptimisticTransitionBlock::from_block(block) + .persist_in_store::(&chain.store)?; Ok(()) } else { Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into()) diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 57a1da9dc6..ed6c2459eb 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -28,6 +28,7 @@ mod observed_aggregates; mod observed_attesters; mod observed_block_producers; pub mod observed_operations; +pub mod otb_verification_service; mod persisted_beacon_chain; mod persisted_fork_choice; mod pre_finalization_cache; @@ -45,6 +46,7 @@ mod validator_pubkey_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, CountUnrealized, ForkChoiceError, ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped, + INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; pub use self::beacon_snapshot::BeaconSnapshot; diff --git a/beacon_node/beacon_chain/src/otb_verification_service.rs b/beacon_node/beacon_chain/src/otb_verification_service.rs new file mode 100644 index 0000000000..805b61dd9c --- /dev/null +++ b/beacon_node/beacon_chain/src/otb_verification_service.rs @@ -0,0 +1,378 @@ +use crate::execution_payload::{validate_merge_block, AllowOptimisticImport}; +use crate::{ + BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ExecutionPayloadError, + INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, +}; +use itertools::process_results; +use proto_array::InvalidationOperation; +use slog::{crit, debug, error, info, warn}; +use slot_clock::SlotClock; +use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; +use state_processing::per_block_processing::is_merge_transition_complete; +use std::sync::Arc; +use store::{DBColumn, Error as StoreError, HotColdDB, KeyValueStore, StoreItem}; +use task_executor::{ShutdownReason, TaskExecutor}; +use tokio::time::sleep; +use tree_hash::TreeHash; +use types::{BeaconBlockRef, EthSpec, Hash256, Slot}; +use DBColumn::OptimisticTransitionBlock as OTBColumn; + +#[derive(Clone, Debug, Decode, Encode, PartialEq)] +pub struct OptimisticTransitionBlock { + root: Hash256, + slot: Slot, +} + +impl OptimisticTransitionBlock { + // types::BeaconBlockRef<'_, ::EthSpec> + pub fn from_block(block: BeaconBlockRef) -> Self { + Self { + root: block.tree_hash_root(), + slot: block.slot(), + } + } + + pub fn root(&self) -> &Hash256 { + &self.root + } + + pub fn slot(&self) -> &Slot { + &self.slot + } + + pub fn persist_in_store(&self, store: A) -> Result<(), StoreError> + where + T: BeaconChainTypes, + A: AsRef>, + { + if store + .as_ref() + .item_exists::(&self.root)? + { + Ok(()) + } else { + store.as_ref().put_item(&self.root, self) + } + } + + pub fn remove_from_store(&self, store: A) -> Result<(), StoreError> + where + T: BeaconChainTypes, + A: AsRef>, + { + store + .as_ref() + .hot_db + .key_delete(OTBColumn.into(), self.root.as_bytes()) + } + + fn is_canonical( + &self, + chain: &BeaconChain, + ) -> Result { + Ok(chain + .forwards_iter_block_roots_until(self.slot, self.slot)? + .next() + .transpose()? + .map(|(root, _)| root) + == Some(self.root)) + } +} + +impl StoreItem for OptimisticTransitionBlock { + fn db_column() -> DBColumn { + OTBColumn + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } +} + +/// The routine is expected to run once per epoch, 1/4th through the epoch. +pub const EPOCH_DELAY_FACTOR: u32 = 4; + +/// Spawns a routine which checks the validity of any optimistically imported transition blocks +/// +/// This routine will run once per epoch, at `epoch_duration / EPOCH_DELAY_FACTOR` after +/// the start of each epoch. +/// +/// The service will not be started if there is no `execution_layer` on the `chain`. +pub fn start_otb_verification_service( + executor: TaskExecutor, + chain: Arc>, +) { + // Avoid spawning the service if there's no EL, it'll just error anyway. + if chain.execution_layer.is_some() { + executor.spawn( + async move { otb_verification_service(chain).await }, + "otb_verification_service", + ); + } +} + +pub fn load_optimistic_transition_blocks( + chain: &BeaconChain, +) -> Result, StoreError> { + process_results(chain.store.hot_db.iter_column(OTBColumn), |iter| { + iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes)) + .collect() + })? +} + +#[derive(Debug)] +pub enum Error { + ForkChoice(String), + BeaconChain(BeaconChainError), + StoreError(StoreError), + NoBlockFound(OptimisticTransitionBlock), +} + +pub async fn validate_optimistic_transition_blocks( + chain: &Arc>, + otbs: Vec, +) -> Result<(), Error> { + let finalized_slot = chain + .canonical_head + .fork_choice_read_lock() + .get_finalized_block() + .map_err(|e| Error::ForkChoice(format!("{:?}", e)))? + .slot; + + // separate otbs into + // non-canonical + // finalized canonical + // unfinalized canonical + let mut non_canonical_otbs = vec![]; + let (finalized_canonical_otbs, unfinalized_canonical_otbs) = process_results( + otbs.into_iter().map(|otb| { + otb.is_canonical(chain) + .map(|is_canonical| (otb, is_canonical)) + }), + |pair_iter| { + pair_iter + .filter_map(|(otb, is_canonical)| { + if is_canonical { + Some(otb) + } else { + non_canonical_otbs.push(otb); + None + } + }) + .partition::, _>(|otb| *otb.slot() <= finalized_slot) + }, + ) + .map_err(Error::BeaconChain)?; + + // remove non-canonical blocks that conflict with finalized checkpoint from the database + for otb in non_canonical_otbs { + if *otb.slot() <= finalized_slot { + otb.remove_from_store::(&chain.store) + .map_err(Error::StoreError)?; + } + } + + // ensure finalized canonical otb are valid, otherwise kill client + for otb in finalized_canonical_otbs { + match chain.get_block(otb.root()).await { + Ok(Some(block)) => { + match validate_merge_block(chain, block.message(), AllowOptimisticImport::No).await + { + Ok(()) => { + // merge transition block is valid, remove it from OTB + otb.remove_from_store::(&chain.store) + .map_err(Error::StoreError)?; + info!( + chain.log, + "Validated merge transition block"; + "block_root" => ?otb.root(), + "type" => "finalized" + ); + } + // The block was not able to be verified by the EL. Leave the OTB in the + // database since the EL is likely still syncing and may verify the block + // later. + Err(BlockError::ExecutionPayloadError( + ExecutionPayloadError::UnverifiedNonOptimisticCandidate, + )) => (), + Err(BlockError::ExecutionPayloadError( + ExecutionPayloadError::InvalidTerminalPoWBlock { .. }, + )) => { + // Finalized Merge Transition Block is Invalid! Kill the Client! + crit!( + chain.log, + "Finalized merge transition block is invalid!"; + "msg" => "You must use the `--purge-db` flag to clear the database and restart sync. \ + You may be on a hostile network.", + "block_hash" => ?block.canonical_root() + ); + let mut shutdown_sender = chain.shutdown_sender(); + if let Err(e) = shutdown_sender.try_send(ShutdownReason::Failure( + INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, + )) { + crit!( + chain.log, + "Failed to shut down client"; + "error" => ?e, + "shutdown_reason" => INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON + ); + } + } + _ => {} + } + } + Ok(None) => return Err(Error::NoBlockFound(otb)), + // Our database has pruned the payload and the payload was unavailable on the EL since + // the EL is still syncing or the payload is non-canonical. + Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => (), + Err(e) => return Err(Error::BeaconChain(e)), + } + } + + // attempt to validate any non-finalized canonical otb blocks + for otb in unfinalized_canonical_otbs { + match chain.get_block(otb.root()).await { + Ok(Some(block)) => { + match validate_merge_block(chain, block.message(), AllowOptimisticImport::No).await + { + Ok(()) => { + // merge transition block is valid, remove it from OTB + otb.remove_from_store::(&chain.store) + .map_err(Error::StoreError)?; + info!( + chain.log, + "Validated merge transition block"; + "block_root" => ?otb.root(), + "type" => "not finalized" + ); + } + // The block was not able to be verified by the EL. Leave the OTB in the + // database since the EL is likely still syncing and may verify the block + // later. + Err(BlockError::ExecutionPayloadError( + ExecutionPayloadError::UnverifiedNonOptimisticCandidate, + )) => (), + Err(BlockError::ExecutionPayloadError( + ExecutionPayloadError::InvalidTerminalPoWBlock { .. }, + )) => { + // Unfinalized Merge Transition Block is Invalid -> Run process_invalid_execution_payload + warn!( + chain.log, + "Merge transition block invalid"; + "block_root" => ?otb.root() + ); + chain + .process_invalid_execution_payload( + &InvalidationOperation::InvalidateOne { + block_root: *otb.root(), + }, + ) + .await + .map_err(|e| { + warn!( + chain.log, + "Error checking merge transition block"; + "error" => ?e, + "location" => "process_invalid_execution_payload" + ); + Error::BeaconChain(e) + })?; + } + _ => {} + } + } + Ok(None) => return Err(Error::NoBlockFound(otb)), + // Our database has pruned the payload and the payload was unavailable on the EL since + // the EL is still syncing or the payload is non-canonical. + Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => (), + Err(e) => return Err(Error::BeaconChain(e)), + } + } + + Ok(()) +} + +/// Loop until any optimistically imported merge transition blocks have been verified and +/// the merge has been finalized. +async fn otb_verification_service(chain: Arc>) { + let epoch_duration = chain.slot_clock.slot_duration() * T::EthSpec::slots_per_epoch() as u32; + loop { + match chain + .slot_clock + .duration_to_next_epoch(T::EthSpec::slots_per_epoch()) + { + Some(duration) => { + let additional_delay = epoch_duration / EPOCH_DELAY_FACTOR; + sleep(duration + additional_delay).await; + + debug!( + chain.log, + "OTB verification service firing"; + ); + + if !is_merge_transition_complete( + &chain.canonical_head.cached_head().snapshot.beacon_state, + ) { + // We are pre-merge. Nothing to do yet. + continue; + } + + // load all optimistically imported transition blocks from the database + match load_optimistic_transition_blocks(chain.as_ref()) { + Ok(otbs) => { + if otbs.is_empty() { + if chain + .canonical_head + .fork_choice_read_lock() + .get_finalized_block() + .map_or(false, |block| { + block.execution_status.is_execution_enabled() + }) + { + // there are no optimistic blocks in the database, we can exit + // the service since the merge transition is finalized and we'll + // never see another transition block + break; + } else { + debug!( + chain.log, + "No optimistic transition blocks"; + "info" => "waiting for the merge transition to finalize" + ) + } + } + if let Err(e) = validate_optimistic_transition_blocks(&chain, otbs).await { + warn!( + chain.log, + "Error while validating optimistic transition blocks"; + "error" => ?e + ); + } + } + Err(e) => { + error!( + chain.log, + "Error loading optimistic transition blocks"; + "error" => ?e + ); + } + }; + } + None => { + error!(chain.log, "Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + sleep(chain.slot_clock.slot_duration()).await; + } + }; + } + debug!( + chain.log, + "No optimistic transition blocks in database"; + "msg" => "shutting down OTB verification service" + ); +} diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 4107631378..df0c61f532 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -1,13 +1,19 @@ #![cfg(not(debug_assertions))] +use beacon_chain::otb_verification_service::{ + load_optimistic_transition_blocks, validate_optimistic_transition_blocks, + OptimisticTransitionBlock, +}; use beacon_chain::{ canonical_head::{CachedHead, CanonicalHead}, test_utils::{BeaconChainHarness, EphemeralHarnessType}, BeaconChainError, BlockError, ExecutionPayloadError, StateSkipConfig, WhenSlotSkipped, + INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, }; use execution_layer::{ json_structures::{JsonForkChoiceStateV1, JsonPayloadAttributesV1}, + test_utils::ExecutionBlockGenerator, ExecutionLayer, ForkChoiceState, PayloadAttributes, }; use fork_choice::{ @@ -44,7 +50,11 @@ struct InvalidPayloadRig { impl InvalidPayloadRig { fn new() -> Self { - let mut spec = E::default_spec(); + let spec = E::default_spec(); + Self::new_with_spec(spec) + } + + fn new_with_spec(mut spec: ChainSpec) -> Self { spec.altair_fork_epoch = Some(Epoch::new(0)); spec.bellatrix_fork_epoch = Some(Epoch::new(0)); @@ -1203,6 +1213,548 @@ async fn attesting_to_optimistic_head() { get_aggregated_by_slot_and_root().unwrap(); } +/// A helper struct to build out a chain of some configurable length which undergoes the merge +/// transition. +struct OptimisticTransitionSetup { + blocks: Vec>>, + execution_block_generator: ExecutionBlockGenerator, +} + +impl OptimisticTransitionSetup { + async fn new(num_blocks: usize, ttd: u64) -> Self { + let mut spec = E::default_spec(); + spec.terminal_total_difficulty = ttd.into(); + let mut rig = InvalidPayloadRig::new_with_spec(spec).enable_attestations(); + rig.move_to_terminal_block(); + + let mut blocks = Vec::with_capacity(num_blocks); + for _ in 0..num_blocks { + let root = rig.import_block(Payload::Valid).await; + let block = rig.harness.chain.get_block(&root).await.unwrap().unwrap(); + blocks.push(Arc::new(block)); + } + + let execution_block_generator = rig + .harness + .mock_execution_layer + .as_ref() + .unwrap() + .server + .execution_block_generator() + .clone(); + + Self { + blocks, + execution_block_generator, + } + } +} + +/// Build a chain which has optimistically imported a transition block. +/// +/// The initial chain will be built with respect to `block_ttd`, whilst the `rig` which imports the +/// chain will operate with respect to `rig_ttd`. This allows for testing mismatched TTDs. +async fn build_optimistic_chain( + block_ttd: u64, + rig_ttd: u64, + num_blocks: usize, +) -> InvalidPayloadRig { + let OptimisticTransitionSetup { + blocks, + execution_block_generator, + } = OptimisticTransitionSetup::new(num_blocks, block_ttd).await; + // Build a brand-new testing harness. We will apply the blocks from the previous harness to + // this one. + let mut spec = E::default_spec(); + spec.terminal_total_difficulty = rig_ttd.into(); + let rig = InvalidPayloadRig::new_with_spec(spec); + + let spec = &rig.harness.chain.spec; + let mock_execution_layer = rig.harness.mock_execution_layer.as_ref().unwrap(); + + // Ensure all the execution blocks from the first rig are available in the second rig. + *mock_execution_layer.server.execution_block_generator() = execution_block_generator; + + // Make the execution layer respond `SYNCING` to all `newPayload` requests. + mock_execution_layer + .server + .all_payloads_syncing_on_new_payload(true); + // Make the execution layer respond `SYNCING` to all `forkchoiceUpdated` requests. + mock_execution_layer + .server + .all_payloads_syncing_on_forkchoice_updated(); + // Make the execution layer respond `None` to all `getBlockByHash` requests. + mock_execution_layer + .server + .all_get_block_by_hash_requests_return_none(); + + let current_slot = std::cmp::max( + blocks[0].slot() + spec.safe_slots_to_import_optimistically, + num_blocks.into(), + ); + rig.harness.set_current_slot(current_slot); + + for block in blocks { + rig.harness + .chain + .process_block(block, CountUnrealized::True) + .await + .unwrap(); + } + + rig.harness.chain.recompute_head_at_current_slot().await; + + // Make the execution layer respond normally to `getBlockByHash` requests. + mock_execution_layer + .server + .all_get_block_by_hash_requests_return_natural_value(); + + // Perform some sanity checks to ensure that the transition happened exactly where we expected. + let pre_transition_block_root = rig + .harness + .chain + .block_root_at_slot(Slot::new(0), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + let pre_transition_block = rig + .harness + .chain + .get_block(&pre_transition_block_root) + .await + .unwrap() + .unwrap(); + let post_transition_block_root = rig + .harness + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + let post_transition_block = rig + .harness + .chain + .get_block(&post_transition_block_root) + .await + .unwrap() + .unwrap(); + assert_eq!( + pre_transition_block_root, + post_transition_block.parent_root(), + "the blocks form a single chain" + ); + assert!( + pre_transition_block + .message() + .body() + .execution_payload() + .unwrap() + .execution_payload + == <_>::default(), + "the block *has not* undergone the merge transition" + ); + assert!( + post_transition_block + .message() + .body() + .execution_payload() + .unwrap() + .execution_payload + != <_>::default(), + "the block *has* undergone the merge transition" + ); + + // Assert that the transition block was optimistically imported. + // + // Note: we're using the "fallback" check for optimistic status, so if the block was + // pre-finality then we'll just use the optimistic status of the finalized block. + assert!( + rig.harness + .chain + .canonical_head + .fork_choice_read_lock() + .is_optimistic_block(&post_transition_block_root) + .unwrap(), + "the transition block should be imported optimistically" + ); + + // Get the mock execution layer to respond to `getBlockByHash` requests normally again. + mock_execution_layer + .server + .all_get_block_by_hash_requests_return_natural_value(); + + return rig; +} + +#[tokio::test] +async fn optimistic_transition_block_valid_unfinalized() { + let ttd = 42; + let num_blocks = 16 as usize; + let rig = build_optimistic_chain(ttd, ttd, num_blocks).await; + + let post_transition_block_root = rig + .harness + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + let post_transition_block = rig + .harness + .chain + .get_block(&post_transition_block_root) + .await + .unwrap() + .unwrap(); + + assert!( + rig.cached_head() + .finalized_checkpoint() + .epoch + .start_slot(E::slots_per_epoch()) + < post_transition_block.slot(), + "the transition block should not be finalized" + ); + + let otbs = load_optimistic_transition_blocks(&rig.harness.chain) + .expect("should load optimistic transition block from db"); + assert_eq!( + otbs.len(), + 1, + "There should be one optimistic transition block" + ); + let valid_otb = OptimisticTransitionBlock::from_block(post_transition_block.message()); + assert_eq!( + valid_otb, otbs[0], + "The optimistic transition block stored in the database should be what we expect", + ); + + validate_optimistic_transition_blocks(&rig.harness.chain, otbs) + .await + .expect("should validate fine"); + // now that the transition block has been validated, it should have been removed from the database + let otbs = load_optimistic_transition_blocks(&rig.harness.chain) + .expect("should load optimistic transition block from db"); + assert!( + otbs.is_empty(), + "The valid optimistic transition block should have been removed from the database", + ); +} + +#[tokio::test] +async fn optimistic_transition_block_valid_finalized() { + let ttd = 42; + let num_blocks = 130 as usize; + let rig = build_optimistic_chain(ttd, ttd, num_blocks).await; + + let post_transition_block_root = rig + .harness + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + let post_transition_block = rig + .harness + .chain + .get_block(&post_transition_block_root) + .await + .unwrap() + .unwrap(); + + assert!( + rig.cached_head() + .finalized_checkpoint() + .epoch + .start_slot(E::slots_per_epoch()) + > post_transition_block.slot(), + "the transition block should be finalized" + ); + + let otbs = load_optimistic_transition_blocks(&rig.harness.chain) + .expect("should load optimistic transition block from db"); + assert_eq!( + otbs.len(), + 1, + "There should be one optimistic transition block" + ); + let valid_otb = OptimisticTransitionBlock::from_block(post_transition_block.message()); + assert_eq!( + valid_otb, otbs[0], + "The optimistic transition block stored in the database should be what we expect", + ); + + validate_optimistic_transition_blocks(&rig.harness.chain, otbs) + .await + .expect("should validate fine"); + // now that the transition block has been validated, it should have been removed from the database + let otbs = load_optimistic_transition_blocks(&rig.harness.chain) + .expect("should load optimistic transition block from db"); + assert!( + otbs.is_empty(), + "The valid optimistic transition block should have been removed from the database", + ); +} + +#[tokio::test] +async fn optimistic_transition_block_invalid_unfinalized() { + let block_ttd = 42; + let rig_ttd = 1337; + let num_blocks = 22 as usize; + let rig = build_optimistic_chain(block_ttd, rig_ttd, num_blocks).await; + + let post_transition_block_root = rig + .harness + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + let post_transition_block = rig + .harness + .chain + .get_block(&post_transition_block_root) + .await + .unwrap() + .unwrap(); + + assert!( + rig.cached_head() + .finalized_checkpoint() + .epoch + .start_slot(E::slots_per_epoch()) + < post_transition_block.slot(), + "the transition block should not be finalized" + ); + + let otbs = load_optimistic_transition_blocks(&rig.harness.chain) + .expect("should load optimistic transition block from db"); + assert_eq!( + otbs.len(), + 1, + "There should be one optimistic transition block" + ); + + let invalid_otb = OptimisticTransitionBlock::from_block(post_transition_block.message()); + assert_eq!( + invalid_otb, otbs[0], + "The optimistic transition block stored in the database should be what we expect", + ); + + // No shutdown should've been triggered. + assert_eq!(rig.harness.shutdown_reasons(), vec![]); + // It shouldn't be known as invalid yet + assert!(!rig + .execution_status(post_transition_block_root) + .is_invalid()); + + validate_optimistic_transition_blocks(&rig.harness.chain, otbs) + .await + .unwrap(); + + // Still no shutdown should've been triggered. + assert_eq!(rig.harness.shutdown_reasons(), vec![]); + // It should be marked invalid now + assert!(rig + .execution_status(post_transition_block_root) + .is_invalid()); + + // the invalid merge transition block should NOT have been removed from the database + let otbs = load_optimistic_transition_blocks(&rig.harness.chain) + .expect("should load optimistic transition block from db"); + assert_eq!( + otbs.len(), + 1, + "The invalid merge transition block should still be in the database", + ); + assert_eq!( + invalid_otb, otbs[0], + "The optimistic transition block stored in the database should be what we expect", + ); +} + +#[tokio::test] +async fn optimistic_transition_block_invalid_unfinalized_syncing_ee() { + let block_ttd = 42; + let rig_ttd = 1337; + let num_blocks = 22 as usize; + let rig = build_optimistic_chain(block_ttd, rig_ttd, num_blocks).await; + + let post_transition_block_root = rig + .harness + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + let post_transition_block = rig + .harness + .chain + .get_block(&post_transition_block_root) + .await + .unwrap() + .unwrap(); + + assert!( + rig.cached_head() + .finalized_checkpoint() + .epoch + .start_slot(E::slots_per_epoch()) + < post_transition_block.slot(), + "the transition block should not be finalized" + ); + + let otbs = load_optimistic_transition_blocks(&rig.harness.chain) + .expect("should load optimistic transition block from db"); + assert_eq!( + otbs.len(), + 1, + "There should be one optimistic transition block" + ); + + let invalid_otb = OptimisticTransitionBlock::from_block(post_transition_block.message()); + assert_eq!( + invalid_otb, otbs[0], + "The optimistic transition block stored in the database should be what we expect", + ); + + // No shutdown should've been triggered. + assert_eq!(rig.harness.shutdown_reasons(), vec![]); + // It shouldn't be known as invalid yet + assert!(!rig + .execution_status(post_transition_block_root) + .is_invalid()); + + // Make the execution layer respond `None` to all `getBlockByHash` requests to simulate a + // syncing EE. + let mock_execution_layer = rig.harness.mock_execution_layer.as_ref().unwrap(); + mock_execution_layer + .server + .all_get_block_by_hash_requests_return_none(); + + validate_optimistic_transition_blocks(&rig.harness.chain, otbs) + .await + .unwrap(); + + // Still no shutdown should've been triggered. + assert_eq!(rig.harness.shutdown_reasons(), vec![]); + + // It should still be marked as optimistic. + assert!(rig + .execution_status(post_transition_block_root) + .is_optimistic()); + + // the optimistic merge transition block should NOT have been removed from the database + let otbs = load_optimistic_transition_blocks(&rig.harness.chain) + .expect("should load optimistic transition block from db"); + assert_eq!( + otbs.len(), + 1, + "The optimistic merge transition block should still be in the database", + ); + assert_eq!( + invalid_otb, otbs[0], + "The optimistic transition block stored in the database should be what we expect", + ); + + // Allow the EL to respond to `getBlockByHash`, as if it has finished syncing. + mock_execution_layer + .server + .all_get_block_by_hash_requests_return_natural_value(); + + validate_optimistic_transition_blocks(&rig.harness.chain, otbs) + .await + .unwrap(); + + // Still no shutdown should've been triggered. + assert_eq!(rig.harness.shutdown_reasons(), vec![]); + // It should be marked invalid now + assert!(rig + .execution_status(post_transition_block_root) + .is_invalid()); + + // the invalid merge transition block should NOT have been removed from the database + let otbs = load_optimistic_transition_blocks(&rig.harness.chain) + .expect("should load optimistic transition block from db"); + assert_eq!( + otbs.len(), + 1, + "The invalid merge transition block should still be in the database", + ); + assert_eq!( + invalid_otb, otbs[0], + "The optimistic transition block stored in the database should be what we expect", + ); +} + +#[tokio::test] +async fn optimistic_transition_block_invalid_finalized() { + let block_ttd = 42; + let rig_ttd = 1337; + let num_blocks = 130 as usize; + let rig = build_optimistic_chain(block_ttd, rig_ttd, num_blocks).await; + + let post_transition_block_root = rig + .harness + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + let post_transition_block = rig + .harness + .chain + .get_block(&post_transition_block_root) + .await + .unwrap() + .unwrap(); + + assert!( + rig.cached_head() + .finalized_checkpoint() + .epoch + .start_slot(E::slots_per_epoch()) + > post_transition_block.slot(), + "the transition block should be finalized" + ); + + let otbs = load_optimistic_transition_blocks(&rig.harness.chain) + .expect("should load optimistic transition block from db"); + + assert_eq!( + otbs.len(), + 1, + "There should be one optimistic transition block" + ); + + let invalid_otb = OptimisticTransitionBlock::from_block(post_transition_block.message()); + assert_eq!( + invalid_otb, otbs[0], + "The optimistic transition block stored in the database should be what we expect", + ); + + // No shutdown should've been triggered yet. + assert_eq!(rig.harness.shutdown_reasons(), vec![]); + + validate_optimistic_transition_blocks(&rig.harness.chain, otbs) + .await + .expect("should invalidate merge transition block and shutdown the client"); + + // The beacon chain should have triggered a shutdown. + assert_eq!( + rig.harness.shutdown_reasons(), + vec![ShutdownReason::Failure( + INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON + )] + ); + + // the invalid merge transition block should NOT have been removed from the database + let otbs = load_optimistic_transition_blocks(&rig.harness.chain) + .expect("should load optimistic transition block from db"); + assert_eq!( + otbs.len(), + 1, + "The invalid merge transition block should still be in the database", + ); + assert_eq!( + invalid_otb, otbs[0], + "The optimistic transition block stored in the database should be what we expect", + ); +} + /// Helper for running tests where we generate a chain with an invalid head and then some /// `fork_blocks` to recover it. struct InvalidHeadSetup { diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index b7f06183f1..4de28d8368 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -1,6 +1,7 @@ use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use crate::Client; +use beacon_chain::otb_verification_service::start_otb_verification_service; use beacon_chain::proposer_prep_service::start_proposer_prep_service; use beacon_chain::schema_change::migrate_schema; use beacon_chain::{ @@ -728,6 +729,7 @@ where } start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone()); + start_otb_verification_service(runtime_context.executor.clone(), beacon_chain.clone()); } Ok(Client { diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs index 6935c88f22..3620a02dfb 100644 --- a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -105,6 +105,7 @@ pub struct PoWBlock { pub timestamp: u64, } +#[derive(Clone)] pub struct ExecutionBlockGenerator { /* * Common database diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs index eceb50df23..975f09fa5e 100644 --- a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -48,6 +48,12 @@ pub async fn handle_rpc( s.parse() .map_err(|e| format!("unable to parse hash: {:?}", e)) })?; + + // If we have a static response set, just return that. + if let Some(response) = *ctx.static_get_block_by_hash_response.lock() { + return Ok(serde_json::to_value(response).unwrap()); + } + let full_tx = params .get(1) .and_then(JsonValue::as_bool) diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index 2463153951..462e34e910 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -2,7 +2,7 @@ use crate::engine_api::auth::JwtKey; use crate::engine_api::{ - auth::Auth, http::JSONRPC_VERSION, PayloadStatusV1, PayloadStatusV1Status, + auth::Auth, http::JSONRPC_VERSION, ExecutionBlock, PayloadStatusV1, PayloadStatusV1Status, }; use bytes::Bytes; use environment::null_logger; @@ -96,6 +96,7 @@ impl MockServer { preloaded_responses, static_new_payload_response: <_>::default(), static_forkchoice_updated_response: <_>::default(), + static_get_block_by_hash_response: <_>::default(), _phantom: PhantomData, }); @@ -317,6 +318,16 @@ impl MockServer { self.set_forkchoice_updated_response(Self::invalid_terminal_block_status()); } + /// This will make the node appear like it is syncing. + pub fn all_get_block_by_hash_requests_return_none(&self) { + *self.ctx.static_get_block_by_hash_response.lock() = Some(None); + } + + /// The node will respond "naturally"; it will return blocks if they're known to it. + pub fn all_get_block_by_hash_requests_return_natural_value(&self) { + *self.ctx.static_get_block_by_hash_response.lock() = None; + } + /// Disables any static payload responses so the execution block generator will do its own /// verification. pub fn full_payload_verification(&self) { @@ -406,6 +417,7 @@ pub struct Context { pub previous_request: Arc>>, pub static_new_payload_response: Arc>>, pub static_forkchoice_updated_response: Arc>>, + pub static_get_block_by_hash_response: Arc>>>, pub _phantom: PhantomData, } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 364bda2cc4..75aeca058b 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -208,6 +208,9 @@ pub enum DBColumn { BeaconRandaoMixes, #[strum(serialize = "dht")] DhtEnrs, + /// For Optimistically Imported Merge Transition Blocks + #[strum(serialize = "otb")] + OptimisticTransitionBlock, } /// A block from the database, which might have an execution payload or not. diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 3ff39c67f7..1473f59a4e 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,14 +1,17 @@ use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp}; +use crate::{ColumnIter, DBColumn}; use parking_lot::{Mutex, MutexGuard, RwLock}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use types::*; type DBHashMap = HashMap, Vec>; +type DBKeyMap = HashMap, HashSet>>; /// A thread-safe `HashMap` wrapper. pub struct MemoryStore { db: RwLock, + col_keys: RwLock, transaction_mutex: Mutex<()>, _phantom: PhantomData, } @@ -18,6 +21,7 @@ impl MemoryStore { pub fn open() -> Self { Self { db: RwLock::new(HashMap::new()), + col_keys: RwLock::new(HashMap::new()), transaction_mutex: Mutex::new(()), _phantom: PhantomData, } @@ -41,6 +45,11 @@ impl KeyValueStore for MemoryStore { fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { let column_key = Self::get_key_for_col(col, key); self.db.write().insert(column_key, val.to_vec()); + self.col_keys + .write() + .entry(col.as_bytes().to_vec()) + .or_insert_with(HashSet::new) + .insert(key.to_vec()); Ok(()) } @@ -63,6 +72,10 @@ impl KeyValueStore for MemoryStore { fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> { let column_key = Self::get_key_for_col(col, key); self.db.write().remove(&column_key); + self.col_keys + .write() + .get_mut(&col.as_bytes().to_vec()) + .map(|set| set.remove(key)); Ok(()) } @@ -81,6 +94,26 @@ impl KeyValueStore for MemoryStore { Ok(()) } + // pub type ColumnIter<'a> = Box), Error>> + 'a>; + fn iter_column(&self, column: DBColumn) -> ColumnIter { + let col = column.as_str(); + if let Some(keys) = self + .col_keys + .read() + .get(col.as_bytes()) + .map(|set| set.iter().cloned().collect::>()) + { + Box::new(keys.into_iter().filter_map(move |key| { + let hash = Hash256::from_slice(&key); + self.get_bytes(col, &key) + .transpose() + .map(|res| res.map(|bytes| (hash, bytes))) + })) + } else { + Box::new(std::iter::empty()) + } + } + fn begin_rw_transaction(&self) -> MutexGuard<()> { self.transaction_mutex.lock() }