diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index c672ff6be6..85d7b2b7d5 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -986,11 +986,17 @@ fn verify_head_block_is_known( attestation: &Attestation, max_skip_slots: Option, ) -> Result { - if let Some(block) = chain + let block_opt = chain .fork_choice .read() .get_block(&attestation.data.beacon_block_root) - { + .or_else(|| { + chain + .early_attester_cache + .get_proto_block(attestation.data.beacon_block_root) + }); + + if let Some(block) = block_opt { // Reject any block that exceeds our limit on skipped slots. if let Some(max_skip_slots) = max_skip_slots { if attestation.data.slot > block.slot + max_skip_slots { @@ -1242,7 +1248,9 @@ where // processing an attestation that does not include our latest finalized block in its chain. // // We do not delay consideration for later, we simply drop the attestation. - if !chain.fork_choice.read().contains_block(&target.root) { + if !chain.fork_choice.read().contains_block(&target.root) + && !chain.early_attester_cache.contains_block(target.root) + { return Err(Error::UnknownTargetRoot(target.root)); } diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 01662efc13..24963a125d 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -75,7 +75,7 @@ impl From for Error { /// Stores the minimal amount of data required to compute the committee length for any committee at any /// slot in a given `epoch`. -struct CommitteeLengths { +pub struct CommitteeLengths { /// The `epoch` to which the lengths pertain. epoch: Epoch, /// The length of the shuffling in `self.epoch`. @@ -84,7 +84,7 @@ struct CommitteeLengths { impl CommitteeLengths { /// Instantiate `Self` using `state.current_epoch()`. - fn new(state: &BeaconState, spec: &ChainSpec) -> Result { + pub fn new(state: &BeaconState, spec: &ChainSpec) -> Result { let active_validator_indices_len = if let Ok(committee_cache) = state.committee_cache(RelativeEpoch::Current) { @@ -101,8 +101,16 @@ impl CommitteeLengths { }) } + /// Get the count of committees per each slot of `self.epoch`. + pub fn get_committee_count_per_slot( + &self, + spec: &ChainSpec, + ) -> Result { + T::get_committee_count_per_slot(self.active_validator_indices_len, spec).map_err(Into::into) + } + /// Get the length of the committee at the given `slot` and `committee_index`. - fn get( + pub fn get_committee_length( &self, slot: Slot, committee_index: CommitteeIndex, @@ -120,8 +128,7 @@ impl CommitteeLengths { } let slots_per_epoch = slots_per_epoch as usize; - let committees_per_slot = - T::get_committee_count_per_slot(self.active_validator_indices_len, spec)?; + let committees_per_slot = self.get_committee_count_per_slot::(spec)?; let index_in_epoch = compute_committee_index_in_epoch( slot, slots_per_epoch, @@ -172,7 +179,7 @@ impl AttesterCacheValue { spec: &ChainSpec, ) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> { self.committee_lengths - .get::(slot, committee_index, spec) + .get_committee_length::(slot, committee_index, spec) .map(|committee_length| (self.current_justified_checkpoint, committee_length)) } } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6edcb7d6c9..f2a2271542 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -12,6 +12,7 @@ use crate::block_verification::{ IntoFullyVerifiedBlock, }; use crate::chain_config::ChainConfig; +use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::events::ServerSentEventHandler; @@ -107,6 +108,9 @@ pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero(); pub const ETH1_CACHE_DB_KEY: Hash256 = Hash256::zero(); pub const FORK_CHOICE_DB_KEY: Hash256 = Hash256::zero(); +/// Defines how old a block can be before it's no longer a candidate for the early attester cache. +const EARLY_ATTESTER_CACHE_HISTORIC_SLOTS: u64 = 4; + /// Defines the behaviour when a block/block-root for a skipped slot is requested. pub enum WhenSlotSkipped { /// If the slot is a skip slot, return `None`. @@ -328,6 +332,8 @@ pub struct BeaconChain { pub(crate) validator_pubkey_cache: TimeoutRwLock>, /// A cache used when producing attestations. pub(crate) attester_cache: Arc, + /// A cache used when producing attestations whilst the head block is still being imported. + pub early_attester_cache: EarlyAttesterCache, /// A cache used to keep track of various block timings. pub block_times_cache: Arc>, /// A list of any hard-coded forks that have been disabled. @@ -926,6 +932,28 @@ impl BeaconChain { )? } + /// Returns the block at the given root, if any. + /// + /// Will also check the early attester cache for the block. Because of this, there's no + /// guarantee that a block returned from this function has a `BeaconState` available in + /// `self.store`. The expected use for this function is *only* for returning blocks requested + /// from P2P peers. + /// + /// ## Errors + /// + /// May return a database error. + pub fn get_block_checking_early_attester_cache( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + let block_opt = self + .store + .get_block(block_root)? + .or_else(|| self.early_attester_cache.get_block(*block_root)); + + Ok(block_opt) + } + /// Returns the block at the given root, if any. /// /// ## Errors @@ -1422,6 +1450,29 @@ impl BeaconChain { ) -> Result, Error> { let _total_timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_SECONDS); + // The early attester cache will return `Some(attestation)` in the scenario where there is a + // block being imported that will become the head block, but that block has not yet been + // inserted into the database and set as `self.canonical_head`. + // + // In effect, the early attester cache prevents slow database IO from causing missed + // head/target votes. + match self + .early_attester_cache + .try_attest(request_slot, request_index, &self.spec) + { + // The cache matched this request, return the value. + Ok(Some(attestation)) => return Ok(attestation), + // The cache did not match this request, proceed with the rest of this function. + Ok(None) => (), + // The cache returned an error. Log the error and proceed with the rest of this + // function. + Err(e) => warn!( + self.log, + "Early attester cache failed"; + "error" => ?e + ), + } + let slots_per_epoch = T::EthSpec::slots_per_epoch(); let request_epoch = request_slot.epoch(slots_per_epoch); @@ -2602,6 +2653,42 @@ impl BeaconChain { } } + // If the block is recent enough, check to see if it becomes the head block. If so, apply it + // to the early attester cache. This will allow attestations to the block without waiting + // for the block and state to be inserted to the database. + // + // Only performing this check on recent blocks avoids slowing down sync with lots of calls + // to fork choice `get_head`. + if block.slot() + EARLY_ATTESTER_CACHE_HISTORIC_SLOTS >= current_slot { + let new_head_root = fork_choice + .get_head(current_slot, &self.spec) + .map_err(BeaconChainError::from)?; + + if new_head_root == block_root { + if let Some(proto_block) = fork_choice.get_block(&block_root) { + if let Err(e) = self.early_attester_cache.add_head_block( + block_root, + signed_block.clone(), + proto_block, + &state, + &self.spec, + ) { + warn!( + self.log, + "Early attester cache insert failed"; + "error" => ?e + ); + } + } else { + warn!( + self.log, + "Early attester block missing"; + "block_root" => ?block_root + ); + } + } + } + // Register sync aggregate with validator monitor if let Ok(sync_aggregate) = block.body().sync_aggregate() { // `SyncCommittee` for the sync_aggregate should correspond to the duty slot @@ -3248,6 +3335,9 @@ impl BeaconChain { drop(lag_timer); + // Clear the early attester cache in case it conflicts with `self.canonical_head`. + self.early_attester_cache.clear(); + // Update the snapshot that stores the head of the chain at the time it received the // block. *self diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 54397a7d55..4662d05d3d 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -763,6 +763,7 @@ where block_times_cache: <_>::default(), validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), attester_cache: <_>::default(), + early_attester_cache: <_>::default(), disabled_forks: self.disabled_forks, shutdown_sender: self .shutdown_sender diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs new file mode 100644 index 0000000000..56dced94e6 --- /dev/null +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -0,0 +1,161 @@ +use crate::{ + attester_cache::{CommitteeLengths, Error}, + metrics, +}; +use parking_lot::RwLock; +use proto_array::Block as ProtoBlock; +use types::*; + +pub struct CacheItem { + /* + * Values used to create attestations. + */ + epoch: Epoch, + committee_lengths: CommitteeLengths, + beacon_block_root: Hash256, + source: Checkpoint, + target: Checkpoint, + /* + * Values used to make the block available. + */ + block: SignedBeaconBlock, + proto_block: ProtoBlock, +} + +/// Provides a single-item cache which allows for attesting to blocks before those blocks have +/// reached the database. +/// +/// This cache stores enough information to allow Lighthouse to: +/// +/// - Produce an attestation without using `chain.canonical_head`. +/// - Verify that a block root exists (i.e., will be imported in the future) during attestation +/// verification. +/// - Provide a block which can be sent to peers via RPC. +#[derive(Default)] +pub struct EarlyAttesterCache { + item: RwLock>>, +} + +impl EarlyAttesterCache { + /// Removes the cached item, meaning that all future calls to `Self::try_attest` will return + /// `None` until a new cache item is added. + pub fn clear(&self) { + *self.item.write() = None + } + + /// Updates the cache item, so that `Self::try_attest` with return `Some` when given suitable + /// parameters. + pub fn add_head_block( + &self, + beacon_block_root: Hash256, + block: SignedBeaconBlock, + proto_block: ProtoBlock, + state: &BeaconState, + spec: &ChainSpec, + ) -> Result<(), Error> { + let epoch = state.current_epoch(); + let committee_lengths = CommitteeLengths::new(state, spec)?; + let source = state.current_justified_checkpoint(); + let target_slot = epoch.start_slot(E::slots_per_epoch()); + let target = Checkpoint { + epoch, + root: if state.slot() <= target_slot { + beacon_block_root + } else { + *state.get_block_root(target_slot)? + }, + }; + + let item = CacheItem { + epoch, + committee_lengths, + beacon_block_root, + source, + target, + block, + proto_block, + }; + + *self.item.write() = Some(item); + + Ok(()) + } + + /// Will return `Some(attestation)` if all the following conditions are met: + /// + /// - There is a cache `item` present. + /// - If `request_slot` is in the same epoch as `item.epoch`. + /// - If `request_index` does not exceed `item.comittee_count`. + pub fn try_attest( + &self, + request_slot: Slot, + request_index: CommitteeIndex, + spec: &ChainSpec, + ) -> Result>, Error> { + let lock = self.item.read(); + let item = if let Some(item) = lock.as_ref() { + item + } else { + return Ok(None); + }; + + let request_epoch = request_slot.epoch(E::slots_per_epoch()); + if request_epoch != item.epoch { + return Ok(None); + } + + let committee_count = item + .committee_lengths + .get_committee_count_per_slot::(spec)?; + if request_index >= committee_count as u64 { + return Ok(None); + } + + let committee_len = + item.committee_lengths + .get_committee_length::(request_slot, request_index, spec)?; + + let attestation = Attestation { + aggregation_bits: BitList::with_capacity(committee_len) + .map_err(BeaconStateError::from)?, + data: AttestationData { + slot: request_slot, + index: request_index, + beacon_block_root: item.beacon_block_root, + source: item.source, + target: item.target, + }, + signature: AggregateSignature::empty(), + }; + + metrics::inc_counter(&metrics::BEACON_EARLY_ATTESTER_CACHE_HITS); + + Ok(Some(attestation)) + } + + /// Returns `true` if `block_root` matches the cached item. + pub fn contains_block(&self, block_root: Hash256) -> bool { + self.item + .read() + .as_ref() + .map_or(false, |item| item.beacon_block_root == block_root) + } + + /// Returns the block, if `block_root` matches the cached item. + pub fn get_block(&self, block_root: Hash256) -> Option> { + self.item + .read() + .as_ref() + .filter(|item| item.beacon_block_root == block_root) + .map(|item| item.block.clone()) + } + + /// Returns the proto-array block, if `block_root` matches the cached item. + pub fn get_proto_block(&self, block_root: Hash256) -> Option { + self.item + .read() + .as_ref() + .filter(|item| item.beacon_block_root == block_root) + .map(|item| item.proto_block.clone()) + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 513467cef8..768a869551 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -9,6 +9,7 @@ mod block_times_cache; mod block_verification; pub mod builder; pub mod chain_config; +mod early_attester_cache; mod errors; pub mod eth1_chain; pub mod events; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 32ebe70921..32dfc266f3 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -240,6 +240,14 @@ lazy_static! { pub static ref SHUFFLING_CACHE_MISSES: Result = try_create_int_counter("beacon_shuffling_cache_misses_total", "Count of times shuffling cache fulfils request"); + /* + * Early attester cache + */ + pub static ref BEACON_EARLY_ATTESTER_CACHE_HITS: Result = try_create_int_counter( + "beacon_early_attester_cache_hits", + "Count of times the early attester cache returns an attestation" + ); + /* * Attestation Production */ diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 1ce2411c41..4d862cbac7 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -122,6 +122,24 @@ fn produces_attestations() { ); assert_eq!(data.target.epoch, state.current_epoch(), "bad target epoch"); assert_eq!(data.target.root, target_root, "bad target root"); + + let early_attestation = { + let proto_block = chain.fork_choice.read().get_block(&block_root).unwrap(); + chain + .early_attester_cache + .add_head_block(block_root, block.clone(), proto_block, &state, &chain.spec) + .unwrap(); + chain + .early_attester_cache + .try_attest(slot, index, &chain.spec) + .unwrap() + .unwrap() + }; + + assert_eq!( + attestation, early_attestation, + "early attester cache inconsistent" + ); } } } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index f3d49c2b42..f79a655745 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -129,7 +129,7 @@ impl Worker { ) { let mut send_block_count = 0; for root in request.block_roots.iter() { - if let Ok(Some(block)) = self.chain.store.get_block(root) { + if let Ok(Some(block)) = self.chain.get_block_checking_early_attester_cache(root) { self.send_response( peer_id, Response::BlocksByRoot(Some(Box::new(block))),