diff --git a/Cargo.lock b/Cargo.lock index b5d77d235d..a35902c192 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2524,6 +2524,7 @@ dependencies = [ "lighthouse_metrics", "lighthouse_network", "lighthouse_version", + "logging", "network", "parking_lot 0.12.0", "safe_arith", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 60670c8490..bbb9f8cb82 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -18,6 +18,7 @@ use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::events::ServerSentEventHandler; use crate::execution_payload::get_execution_payload; +use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; use crate::head_tracker::HeadTracker; use crate::historical_blocks::HistoricalBlockError; use crate::migrate::BackgroundMigrator; @@ -339,6 +340,10 @@ pub struct BeaconChain { /// A state-machine that is updated with information from the network and chooses a canonical /// head block. pub fork_choice: RwLock>, + /// Transmitter used to indicate that slot-start fork choice has completed running. + pub fork_choice_signal_tx: Option, + /// Receiver used by block production to wait on slot-start fork choice. + pub fork_choice_signal_rx: Option, /// A handler for events generated by the beacon chain. This is only initialized when the /// HTTP server is enabled. pub event_handler: Option>, @@ -2952,12 +2957,64 @@ impl BeaconChain { Ok(block_root) } + /// If configured, wait for the fork choice run at the start of the slot to complete. + fn wait_for_fork_choice_before_block_production( + self: &Arc, + slot: Slot, + ) -> Result<(), BlockProductionError> { + if let Some(rx) = &self.fork_choice_signal_rx { + let current_slot = self + .slot() + .map_err(|_| BlockProductionError::UnableToReadSlot)?; + + let timeout = Duration::from_millis(self.config.fork_choice_before_proposal_timeout_ms); + + if slot == current_slot || slot == current_slot + 1 { + match rx.wait_for_fork_choice(slot, timeout) { + ForkChoiceWaitResult::Success(fc_slot) => { + debug!( + self.log, + "Fork choice successfully updated before block production"; + "slot" => slot, + "fork_choice_slot" => fc_slot, + ); + } + ForkChoiceWaitResult::Behind(fc_slot) => { + warn!( + self.log, + "Fork choice notifier out of sync with block production"; + "fork_choice_slot" => fc_slot, + "slot" => slot, + "message" => "this block may be orphaned", + ); + } + ForkChoiceWaitResult::TimeOut => { + warn!( + self.log, + "Timed out waiting for fork choice before proposal"; + "message" => "this block may be orphaned", + ); + } + } + } else { + error!( + self.log, + "Producing block at incorrect slot"; + "block_slot" => slot, + "current_slot" => current_slot, + "message" => "check clock sync, this block may be orphaned", + ); + } + } + Ok(()) + } + /// Produce a new block at the given `slot`. /// /// The produced block will not be inherently valid, it must be signed by a block producer. /// Block signing is out of the scope of this function and should be done by a separate program. pub fn produce_block>( - &self, + self: &Arc, randao_reveal: Signature, slot: Slot, validator_graffiti: Option, @@ -2972,7 +3029,7 @@ impl BeaconChain { /// Same as `produce_block` but allowing for configuration of RANDAO-verification. pub fn produce_block_with_verification>( - &self, + self: &Arc, randao_reveal: Signature, slot: Slot, validator_graffiti: Option, @@ -2981,6 +3038,10 @@ impl BeaconChain { metrics::inc_counter(&metrics::BLOCK_PRODUCTION_REQUESTS); let _complete_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_TIMES); + let fork_choice_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_FORK_CHOICE_TIMES); + self.wait_for_fork_choice_before_block_production(slot)?; + drop(fork_choice_timer); + // Producing a block requires the tree hash cache, so clone a full state corresponding to // the head from the snapshot cache. Unfortunately we can't move the snapshot out of the // cache (which would be fast), because we need to re-process the block after it has been @@ -3362,10 +3423,18 @@ impl BeaconChain { /// Execute the fork choice algorithm and enthrone the result as the canonical head. pub fn fork_choice(self: &Arc) -> Result<(), Error> { + self.fork_choice_at_slot(self.slot()?) + } + + /// Execute fork choice at `slot`, processing queued attestations from `slot - 1` and earlier. + /// + /// The `slot` is not verified in any way, callers should ensure it corresponds to at most + /// one slot ahead of the current wall-clock slot. + pub fn fork_choice_at_slot(self: &Arc, slot: Slot) -> Result<(), Error> { metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS); let _timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES); - let result = self.fork_choice_internal(); + let result = self.fork_choice_internal(slot); if result.is_err() { metrics::inc_counter(&metrics::FORK_CHOICE_ERRORS); @@ -3374,13 +3443,13 @@ impl BeaconChain { result } - fn fork_choice_internal(self: &Arc) -> Result<(), Error> { + fn fork_choice_internal(self: &Arc, slot: Slot) -> Result<(), Error> { // Atomically obtain the head block root and the finalized block. let (beacon_block_root, finalized_block) = { let mut fork_choice = self.fork_choice.write(); // Determine the root of the block that is the head of the chain. - let beacon_block_root = fork_choice.get_head(self.slot()?, &self.spec)?; + let beacon_block_root = fork_choice.get_head(slot, &self.spec)?; (beacon_block_root, fork_choice.get_finalized_block()?) }; @@ -3752,6 +3821,8 @@ impl BeaconChain { } // Update the execution layer. + // Always use the wall-clock slot to update the execution engine rather than the `slot` + // passed in. if let Err(e) = self.update_execution_engine_forkchoice_blocking(self.slot()?) { crit!( self.log, @@ -4005,8 +4076,6 @@ impl BeaconChain { "prepare_slot" => prepare_slot ); - // Use the blocking method here so that we don't form a queue of these functions when - // routinely calling them. self.update_execution_engine_forkchoice_async(current_slot) .await?; } @@ -4336,11 +4405,32 @@ impl BeaconChain { } /// Called by the timer on every slot. - /// - /// Performs slot-based pruning. - pub fn per_slot_task(&self) { + pub fn per_slot_task(self: &Arc) { trace!(self.log, "Running beacon chain per slot tasks"); if let Some(slot) = self.slot_clock.now() { + // Run fork choice and signal to any waiting task that it has completed. + if let Err(e) = self.fork_choice() { + error!( + self.log, + "Fork choice error at slot start"; + "error" => ?e, + "slot" => slot, + ); + } + + // Send the notification regardless of fork choice success, this is a "best effort" + // notification and we don't want block production to hit the timeout in case of error. + if let Some(tx) = &self.fork_choice_signal_tx { + if let Err(e) = tx.notify_fork_choice_complete(slot) { + warn!( + self.log, + "Error signalling fork choice waiter"; + "error" => ?e, + "slot" => slot, + ); + } + } + self.naive_aggregation_pool.write().prune(slot); self.block_times_cache.write().prune(slot); } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 2efc972ed5..361246b4d3 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,5 +1,6 @@ use crate::beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY}; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; +use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; use crate::head_tracker::HeadTracker; use crate::migrate::{BackgroundMigrator, MigratorConfig}; @@ -694,6 +695,16 @@ where ); } + // If enabled, set up the fork choice signaller. + let (fork_choice_signal_tx, fork_choice_signal_rx) = + if self.chain_config.fork_choice_before_proposal_timeout_ms != 0 { + let tx = ForkChoiceSignalTx::new(); + let rx = tx.get_receiver(); + (Some(tx), Some(rx)) + } else { + (None, None) + }; + // Store the `PersistedBeaconChain` in the database atomically with the metadata so that on // restart we can correctly detect the presence of an initialized database. // @@ -752,6 +763,8 @@ where genesis_block_root, genesis_state_root, fork_choice: RwLock::new(fork_choice), + fork_choice_signal_tx, + fork_choice_signal_rx, event_handler: self.event_handler, head_tracker, snapshot_cache: TimeoutRwLock::new(SnapshotCache::new( diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 4aee06d468..36c2f41d9d 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -1,6 +1,8 @@ use serde_derive::{Deserialize, Serialize}; use types::Checkpoint; +pub const DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT: u64 = 250; + #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] pub struct ChainConfig { /// Maximum number of slots to skip when importing a consensus message (e.g., block, @@ -18,6 +20,10 @@ pub struct ChainConfig { pub enable_lock_timeouts: bool, /// The max size of a message that can be sent over the network. pub max_network_size: usize, + /// Number of milliseconds to wait for fork choice before proposing a block. + /// + /// If set to 0 then block proposal will not wait for fork choice at all. + pub fork_choice_before_proposal_timeout_ms: u64, } impl Default for ChainConfig { @@ -28,6 +34,7 @@ impl Default for ChainConfig { reconstruct_historic_states: false, enable_lock_timeouts: true, max_network_size: 10 * 1_048_576, // 10M + fork_choice_before_proposal_timeout_ms: DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT, } } } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 280ec3fac3..834823992a 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -185,6 +185,10 @@ pub enum BeaconChainError { }, RuntimeShutdown, ProcessInvalidExecutionPayload(JoinError), + ForkChoiceSignalOutOfOrder { + current: Slot, + latest: Slot, + }, } easy_from_to!(SlotProcessingError, BeaconChainError); @@ -234,6 +238,7 @@ pub enum BlockProductionError { FailedToReadFinalizedBlock(store::Error), MissingFinalizedBlock(Hash256), BlockTooLarge(usize), + ForkChoiceError(BeaconChainError), } easy_from_to!(BlockProcessingError, BlockProductionError); diff --git a/beacon_node/beacon_chain/src/fork_choice_signal.rs b/beacon_node/beacon_chain/src/fork_choice_signal.rs new file mode 100644 index 0000000000..fd92de661d --- /dev/null +++ b/beacon_node/beacon_chain/src/fork_choice_signal.rs @@ -0,0 +1,97 @@ +//! Concurrency helpers for synchronising block proposal with fork choice. +//! +//! The transmitter provides a way for a thread runnning fork choice on a schedule to signal +//! to the receiver that fork choice has been updated for a given slot. +use crate::BeaconChainError; +use parking_lot::{Condvar, Mutex}; +use std::sync::Arc; +use std::time::Duration; +use types::Slot; + +/// Sender, for use by the per-slot task timer. +pub struct ForkChoiceSignalTx { + pair: Arc<(Mutex, Condvar)>, +} + +/// Receiver, for use by the beacon chain waiting on fork choice to complete. +pub struct ForkChoiceSignalRx { + pair: Arc<(Mutex, Condvar)>, +} + +pub enum ForkChoiceWaitResult { + /// Successfully reached a slot greater than or equal to the awaited slot. + Success(Slot), + /// Fork choice was updated to a lower slot, indicative of lag or processing delays. + Behind(Slot), + /// Timed out waiting for the fork choice update from the sender. + TimeOut, +} + +impl ForkChoiceSignalTx { + pub fn new() -> Self { + let pair = Arc::new((Mutex::new(Slot::new(0)), Condvar::new())); + Self { pair } + } + + pub fn get_receiver(&self) -> ForkChoiceSignalRx { + ForkChoiceSignalRx { + pair: self.pair.clone(), + } + } + + /// Signal to the receiver that fork choice has been updated to `slot`. + /// + /// Return an error if the provided `slot` is strictly less than any previously provided slot. + pub fn notify_fork_choice_complete(&self, slot: Slot) -> Result<(), BeaconChainError> { + let &(ref lock, ref condvar) = &*self.pair; + + let mut current_slot = lock.lock(); + + if slot < *current_slot { + return Err(BeaconChainError::ForkChoiceSignalOutOfOrder { + current: *current_slot, + latest: slot, + }); + } else { + *current_slot = slot; + } + + // We use `notify_all` because there may be multiple block proposals waiting simultaneously. + // Usually there'll be 0-1. + condvar.notify_all(); + + Ok(()) + } +} + +impl Default for ForkChoiceSignalTx { + fn default() -> Self { + Self::new() + } +} + +impl ForkChoiceSignalRx { + pub fn wait_for_fork_choice(&self, slot: Slot, timeout: Duration) -> ForkChoiceWaitResult { + let &(ref lock, ref condvar) = &*self.pair; + + let mut current_slot = lock.lock(); + + // Wait for `current_slot >= slot`. + // + // Do not loop and wait, if we receive an update for the wrong slot then something is + // quite out of whack and we shouldn't waste more time waiting. + if *current_slot < slot { + let timeout_result = condvar.wait_for(&mut current_slot, timeout); + + if timeout_result.timed_out() { + return ForkChoiceWaitResult::TimeOut; + } + } + + if *current_slot >= slot { + ForkChoiceWaitResult::Success(*current_slot) + } else { + ForkChoiceWaitResult::Behind(*current_slot) + } + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 65908547ff..579020b1d1 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -15,6 +15,7 @@ mod errors; pub mod eth1_chain; pub mod events; mod execution_payload; +pub mod fork_choice_signal; pub mod fork_revert; mod head_tracker; pub mod historical_blocks; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 41b7604532..4d0f63674a 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -90,6 +90,10 @@ lazy_static! { ); pub static ref BLOCK_PRODUCTION_TIMES: Result = try_create_histogram("beacon_block_production_seconds", "Full runtime of block production"); + pub static ref BLOCK_PRODUCTION_FORK_CHOICE_TIMES: Result = try_create_histogram( + "beacon_block_production_fork_choice_seconds", + "Time taken to run fork choice before block production" + ); pub static ref BLOCK_PRODUCTION_STATE_LOAD_TIMES: Result = try_create_histogram( "beacon_block_production_state_load_seconds", "Time taken to load the base state for block production" diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 71934d844d..7216ac1118 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -27,7 +27,7 @@ use std::sync::{ Arc, }; use task_executor::TaskExecutor; -use tokio::time::sleep; +use tokio::time::{sleep, sleep_until, Instant}; use types::{AttestationShufflingId, EthSpec, Hash256, RelativeEpoch, Slot}; /// If the head slot is more than `MAX_ADVANCE_DISTANCE` from the current slot, then don't perform @@ -105,8 +105,8 @@ async fn state_advance_timer( let slot_duration = slot_clock.slot_duration(); loop { - match beacon_chain.slot_clock.duration_to_next_slot() { - Some(duration) => sleep(duration + (slot_duration / 4) * 3).await, + let duration_to_next_slot = match beacon_chain.slot_clock.duration_to_next_slot() { + Some(duration) => duration, None => { error!(log, "Failed to read slot clock"); // If we can't read the slot clock, just wait another slot. @@ -115,7 +115,45 @@ async fn state_advance_timer( } }; - // Only start spawn the state advance task if the lock was previously free. + // Run the state advance 3/4 of the way through the slot (9s on mainnet). + let state_advance_offset = slot_duration / 4; + let state_advance_instant = if duration_to_next_slot > state_advance_offset { + Instant::now() + duration_to_next_slot - state_advance_offset + } else { + // Skip the state advance for the current slot and wait until the next one. + Instant::now() + duration_to_next_slot + slot_duration - state_advance_offset + }; + + // Run fork choice 23/24s of the way through the slot (11.5s on mainnet). + // We need to run after the state advance, so use the same condition as above. + let fork_choice_offset = slot_duration / 24; + let fork_choice_instant = if duration_to_next_slot > state_advance_offset { + Instant::now() + duration_to_next_slot - fork_choice_offset + } else { + Instant::now() + duration_to_next_slot + slot_duration - fork_choice_offset + }; + + // Wait for the state advance. + sleep_until(state_advance_instant).await; + + // Compute the current slot here at approx 3/4 through the slot. Even though this slot is + // only used by fork choice we need to calculate it here rather than after the state + // advance, in case the state advance flows over into the next slot. + let current_slot = match beacon_chain.slot() { + Ok(slot) => slot, + Err(e) => { + warn!( + log, + "Unable to determine slot in state advance timer"; + "error" => ?e + ); + // If we can't read the slot clock, just wait another slot. + sleep(slot_duration).await; + continue; + } + }; + + // Only spawn the state advance task if the lock was previously free. if !is_running.lock() { let log = log.clone(); let beacon_chain = beacon_chain.clone(); @@ -163,6 +201,40 @@ async fn state_advance_timer( "msg" => "system resources may be overloaded" ) } + + // Run fork choice pre-emptively for the next slot. This processes most of the attestations + // from this slot off the hot path of block verification and production. + // Wait for the fork choice instant (which may already be past). + sleep_until(fork_choice_instant).await; + + let log = log.clone(); + let beacon_chain = beacon_chain.clone(); + let next_slot = current_slot + 1; + executor.spawn_blocking( + move || { + if let Err(e) = beacon_chain.fork_choice_at_slot(next_slot) { + warn!( + log, + "Error updating fork choice for next slot"; + "error" => ?e, + "slot" => next_slot, + ); + } + + // Signal block proposal for the next slot (if it happens to be waiting). + if let Some(tx) = &beacon_chain.fork_choice_signal_tx { + if let Err(e) = tx.notify_fork_choice_complete(next_slot) { + warn!( + log, + "Error signalling fork choice waiter"; + "error" => ?e, + "slot" => next_slot, + ); + } + } + }, + "fork_choice_advance", + ); } } @@ -193,13 +265,6 @@ fn advance_head( } } - // Run fork choice so we get the latest view of the head. - // - // This is useful since it's quite likely that the last time we ran fork choice was shortly - // after receiving the latest gossip block, but not necessarily after we've received the - // majority of attestations. - beacon_chain.fork_choice()?; - let head_root = beacon_chain.head_info()?.block_root; let (head_slot, head_state_root, mut state) = match beacon_chain diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 2dc1d0301d..69ed413fd4 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -61,7 +61,7 @@ pub type BaseHarnessType = pub type DiskHarnessType = BaseHarnessType, LevelDB>; pub type EphemeralHarnessType = BaseHarnessType, MemoryStore>; -type BoxedMutator = Box< +pub type BoxedMutator = Box< dyn FnOnce( BeaconChainBuilder>, ) -> BeaconChainBuilder>, @@ -586,18 +586,7 @@ where // different blocks each time. let graffiti = Graffiti::from(self.rng.lock().gen::<[u8; 32]>()); - let randao_reveal = { - let epoch = slot.epoch(E::slots_per_epoch()); - let domain = self.spec.get_domain( - epoch, - Domain::Randao, - &state.fork(), - state.genesis_validators_root(), - ); - let message = epoch.signing_root(domain); - let sk = &self.validator_keypairs[proposer_index].sk; - sk.sign(message) - }; + let randao_reveal = self.sign_randao_reveal(&state, proposer_index, slot); let (block, state) = self .chain @@ -645,18 +634,7 @@ where // different blocks each time. let graffiti = Graffiti::from(self.rng.lock().gen::<[u8; 32]>()); - let randao_reveal = { - let epoch = slot.epoch(E::slots_per_epoch()); - let domain = self.spec.get_domain( - epoch, - Domain::Randao, - &state.fork(), - state.genesis_validators_root(), - ); - let message = epoch.signing_root(domain); - let sk = &self.validator_keypairs[proposer_index].sk; - sk.sign(message) - }; + let randao_reveal = self.sign_randao_reveal(&state, proposer_index, slot); let pre_state = state.clone(); @@ -682,6 +660,25 @@ where (signed_block, pre_state) } + /// Create a randao reveal for a block at `slot`. + pub fn sign_randao_reveal( + &self, + state: &BeaconState, + proposer_index: usize, + slot: Slot, + ) -> Signature { + let epoch = slot.epoch(E::slots_per_epoch()); + let domain = self.spec.get_domain( + epoch, + Domain::Randao, + &state.fork(), + state.genesis_validators_root(), + ); + let message = epoch.signing_root(domain); + let sk = &self.validator_keypairs[proposer_index].sk; + sk.sign(message) + } + /// Produces an "unaggregated" attestation for the given `slot` and `index` that attests to /// `beacon_block_root`. The provided `state` should match the `block.state_root` for the /// `block` identified by `beacon_block_root`. diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index f982f0d022..a34618c2ef 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -38,6 +38,7 @@ store = { path = "../store" } environment = { path = "../../lighthouse/environment" } tree_hash = "0.4.1" sensitive_url = { path = "../../common/sensitive_url" } +logging = { path = "../../common/logging" } [[test]] name = "bn_http_api_tests" diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 64ce3b6566..8b12aa4a5b 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -1,7 +1,9 @@ //! Generic tests that make use of the (newer) `InteractiveApiTester` use crate::common::*; +use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; use eth2::types::DepositContractData; -use types::{EthSpec, MainnetEthSpec}; +use tree_hash::TreeHash; +use types::{EthSpec, FullPayload, MainnetEthSpec, Slot}; type E = MainnetEthSpec; @@ -30,3 +32,96 @@ async fn deposit_contract_custom_network() { assert_eq!(result, expected); } + +// Test that running fork choice before proposing results in selection of the correct head. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn fork_choice_before_proposal() { + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 32; + let all_validators = (0..validator_count).collect::>(); + let num_initial: u64 = 31; + + let tester = InteractiveTester::::new(None, validator_count).await; + let harness = &tester.harness; + + // Create some chain depth. + harness.advance_slot(); + harness.extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + // We set up the following block graph, where B is a block that is temporarily orphaned by C, + // but is then reinstated and built upon by D. + // + // A | B | - | D | + // ^ | - | C | + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + let slot_c = slot_a + 2; + let slot_d = slot_a + 3; + + let state_a = harness.get_current_state(); + let (block_b, state_b) = harness.make_block(state_a.clone(), slot_b); + let block_root_b = harness.process_block(slot_b, block_b).unwrap(); + + // Create attestations to B but keep them in reserve until after C has been processed. + let attestations_b = harness.make_attestations( + &all_validators, + &state_b, + state_b.tree_hash_root(), + block_root_b, + slot_b, + ); + + let (block_c, state_c) = harness.make_block(state_a, slot_c); + let block_root_c = harness.process_block(slot_c, block_c.clone()).unwrap(); + + // Create attestations to C from a small number of validators and process them immediately. + let attestations_c = harness.make_attestations( + &all_validators[..validator_count / 2], + &state_c, + state_c.tree_hash_root(), + block_root_c, + slot_c, + ); + harness.process_attestations(attestations_c); + + // Apply the attestations to B, but don't re-run fork choice. + harness.process_attestations(attestations_b); + + // Due to proposer boost, the head should be C during slot C. + assert_eq!( + harness.chain.head_info().unwrap().block_root, + block_root_c.into() + ); + + // Ensure that building a block via the HTTP API re-runs fork choice and builds block D upon B. + // Manually prod the per-slot task, because the slot timer doesn't run in the background in + // these tests. + harness.advance_slot(); + harness.chain.per_slot_task(); + + let proposer_index = state_b + .get_beacon_proposer_index(slot_d, &harness.chain.spec) + .unwrap(); + let randao_reveal = harness + .sign_randao_reveal(&state_b, proposer_index, slot_d) + .into(); + let block_d = tester + .client + .get_validator_blocks::>(slot_d, &randao_reveal, None) + .await + .unwrap() + .data; + + // Head is now B. + assert_eq!( + harness.chain.head_info().unwrap().block_root, + block_root_b.into() + ); + // D's parent is B. + assert_eq!(block_d.parent_root(), block_root_b.into()); +} diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 6e4c2996a6..a1347c9b02 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -686,4 +686,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { experimental as it may obscure performance issues.") .takes_value(false) ) + .arg( + Arg::with_name("fork-choice-before-proposal-timeout") + .long("fork-choice-before-proposal-timeout") + .help("Set the maximum number of milliseconds to wait for fork choice before \ + proposing a block. You can prevent waiting at all by setting the timeout \ + to 0, however you risk proposing atop the wrong parent block.") + .default_value("250") + .takes_value(true) + ) } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 461f230d28..b1560c7955 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -581,6 +581,12 @@ pub fn get_config( client_config.chain.enable_lock_timeouts = false; } + if let Some(timeout) = + clap_utils::parse_optional(cli_args, "fork-choice-before-proposal-timeout")? + { + client_config.chain.fork_choice_before_proposal_timeout_ms = timeout; + } + Ok(client_config) } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 3088fa423d..5748bbd341 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -108,6 +108,26 @@ fn disable_lock_timeouts_flag() { .with_config(|config| assert!(!config.chain.enable_lock_timeouts)); } +#[test] +fn fork_choice_before_proposal_timeout_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.chain.fork_choice_before_proposal_timeout_ms, + beacon_node::beacon_chain::chain_config::DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT + ) + }); +} + +#[test] +fn fork_choice_before_proposal_timeout_zero() { + CommandLineTest::new() + .flag("fork-choice-before-proposal-timeout", Some("0")) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.chain.fork_choice_before_proposal_timeout_ms, 0)); +} + #[test] fn freezer_dir_flag() { let dir = TempDir::new().expect("Unable to create temporary directory");