mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-03 21:04:28 +00:00
Ensure lookup sync checks caches correctly (#5840)
* Ensure lookup sync checks caches correctly * fix tests and remove unused method * Simplify BlockProcessStatus
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||
use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes, BlockProcessStatus};
|
||||
use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1};
|
||||
use slog::{crit, debug, error, Logger};
|
||||
use std::collections::HashMap;
|
||||
@@ -410,15 +410,14 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
||||
|
||||
fn check_caches(&self, root: Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
|
||||
if self.check_caches == CheckCaches::Yes {
|
||||
self.beacon_chain
|
||||
.reqresp_pre_import_cache
|
||||
.read()
|
||||
.get(&root)
|
||||
.map(|block| {
|
||||
match self.beacon_chain.get_block_process_status(&root) {
|
||||
BlockProcessStatus::Unknown => None,
|
||||
BlockProcessStatus::NotValidated(block)
|
||||
| BlockProcessStatus::ExecutionValidated(block) => {
|
||||
metrics::inc_counter(&metrics::BEACON_REQRESP_PRE_IMPORT_CACHE_HITS);
|
||||
block.clone()
|
||||
})
|
||||
.or(self.beacon_chain.early_attester_cache.get_block(root))
|
||||
Some(block)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
@@ -337,6 +337,20 @@ struct PartialBeaconBlock<E: EthSpec> {
|
||||
bls_to_execution_changes: Vec<SignedBlsToExecutionChange>,
|
||||
}
|
||||
|
||||
pub enum BlockProcessStatus<E: EthSpec> {
|
||||
/// Block is not in any pre-import cache. Block may be in the data-base or in the fork-choice.
|
||||
Unknown,
|
||||
/// Block is currently processing but not yet validated.
|
||||
NotValidated(Arc<SignedBeaconBlock<E>>),
|
||||
/// Block is fully valid, but not yet imported. It's cached in the da_checker while awaiting
|
||||
/// missing block components.
|
||||
ExecutionValidated(Arc<SignedBeaconBlock<E>>),
|
||||
}
|
||||
|
||||
pub struct BeaconChainMetrics {
|
||||
pub reqresp_pre_import_cache_len: usize,
|
||||
}
|
||||
|
||||
pub type LightClientProducerEvent<T> = (Hash256, Slot, SyncAggregate<T>);
|
||||
|
||||
pub type BeaconForkChoice<T> = ForkChoice<
|
||||
@@ -1237,6 +1251,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
Ok(self.store.get_blinded_block(block_root)?)
|
||||
}
|
||||
|
||||
/// Return the status of a block as it progresses through the various caches of the beacon
|
||||
/// chain. Used by sync to learn the status of a block and prevent repeated downloads /
|
||||
/// processing attempts.
|
||||
pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus<T::EthSpec> {
|
||||
if let Some(block) = self
|
||||
.data_availability_checker
|
||||
.get_execution_valid_block(block_root)
|
||||
{
|
||||
return BlockProcessStatus::ExecutionValidated(block);
|
||||
}
|
||||
|
||||
if let Some(block) = self.reqresp_pre_import_cache.read().get(block_root) {
|
||||
// A block is on the `reqresp_pre_import_cache` but NOT in the
|
||||
// `data_availability_checker` only if it is actively processing. We can expect a future
|
||||
// event with the result of processing
|
||||
return BlockProcessStatus::NotValidated(block.clone());
|
||||
}
|
||||
|
||||
BlockProcessStatus::Unknown
|
||||
}
|
||||
|
||||
/// Returns the state at the given root, if any.
|
||||
///
|
||||
/// ## Errors
|
||||
@@ -6630,6 +6665,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
ForkName::Base => Err(Error::UnsupportedFork),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn metrics(&self) -> BeaconChainMetrics {
|
||||
BeaconChainMetrics {
|
||||
reqresp_pre_import_cache_len: self.reqresp_pre_import_cache.read().len(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> Drop for BeaconChain<T> {
|
||||
|
||||
@@ -86,17 +86,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
|
||||
/// Checks if the block root is currenlty in the availability cache awaiting import because
|
||||
/// of missing components.
|
||||
pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
|
||||
pub fn get_execution_valid_block(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
|
||||
self.availability_cache
|
||||
.has_execution_valid_block(block_root)
|
||||
}
|
||||
|
||||
/// Return the required blobs `block_root` expects if the block is currenlty in the cache.
|
||||
pub fn num_expected_blobs(&self, block_root: &Hash256) -> Option<usize> {
|
||||
self.availability_cache
|
||||
.peek_pending_components(block_root, |components| {
|
||||
components.and_then(|components| components.num_expected_blobs())
|
||||
})
|
||||
.get_execution_valid_block(block_root)
|
||||
}
|
||||
|
||||
/// Return the set of imported blob indexes for `block_root`. Returns None if there is no block
|
||||
|
||||
@@ -44,7 +44,7 @@ use ssz_types::{FixedVector, VariableList};
|
||||
use std::num::NonZeroUsize;
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
use types::blob_sidecar::BlobIdentifier;
|
||||
use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256};
|
||||
use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};
|
||||
|
||||
/// This represents the components of a partially available block
|
||||
///
|
||||
@@ -544,12 +544,19 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
|
||||
}
|
||||
|
||||
/// Returns true if the block root is known, without altering the LRU ordering
|
||||
pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
|
||||
if let Some(pending_components) = self.critical.read().peek_pending_components(block_root) {
|
||||
pending_components.executed_block.is_some()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
pub fn get_execution_valid_block(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
|
||||
self.critical
|
||||
.read()
|
||||
.peek_pending_components(block_root)
|
||||
.and_then(|pending_components| {
|
||||
pending_components
|
||||
.executed_block
|
||||
.as_ref()
|
||||
.map(|block| block.block_cloned())
|
||||
})
|
||||
}
|
||||
|
||||
/// Fetch a blob from the cache without affecting the LRU ordering
|
||||
|
||||
@@ -37,6 +37,10 @@ impl<E: EthSpec> DietAvailabilityPendingExecutedBlock<E> {
|
||||
&self.block
|
||||
}
|
||||
|
||||
pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
|
||||
self.block.clone()
|
||||
}
|
||||
|
||||
pub fn num_blobs_expected(&self) -> usize {
|
||||
self.block
|
||||
.message()
|
||||
|
||||
@@ -62,9 +62,10 @@ pub mod validator_pubkey_cache;
|
||||
|
||||
pub use self::beacon_chain::{
|
||||
AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconBlockResponse,
|
||||
BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult,
|
||||
ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate, ProduceBlockVerification,
|
||||
StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
|
||||
BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus,
|
||||
ChainSegmentResult, ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate,
|
||||
ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped,
|
||||
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
|
||||
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON,
|
||||
};
|
||||
pub use self::beacon_snapshot::BeaconSnapshot;
|
||||
|
||||
@@ -1192,6 +1192,7 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
|
||||
}
|
||||
|
||||
let attestation_stats = beacon_chain.op_pool.attestation_stats();
|
||||
let chain_metrics = beacon_chain.metrics();
|
||||
|
||||
set_gauge_by_usize(
|
||||
&BLOCK_PROCESSING_SNAPSHOT_CACHE_SIZE,
|
||||
@@ -1200,7 +1201,7 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
|
||||
|
||||
set_gauge_by_usize(
|
||||
&BEACON_REQRESP_PRE_IMPORT_CACHE_SIZE,
|
||||
beacon_chain.reqresp_pre_import_cache.read().len(),
|
||||
chain_metrics.reqresp_pre_import_cache_len,
|
||||
);
|
||||
|
||||
let da_checker_metrics = beacon_chain.data_availability_checker.metrics();
|
||||
|
||||
@@ -1452,13 +1452,16 @@ fn block_in_processing_cache_becomes_invalid() {
|
||||
let peer_id = r.new_connected_peer();
|
||||
r.insert_block_to_processing_cache(block.clone().into());
|
||||
r.trigger_unknown_block_from_attestation(block_root, peer_id);
|
||||
// Should trigger blob request
|
||||
let id = r.expect_blob_lookup_request(block_root);
|
||||
// Should not trigger block request
|
||||
r.expect_empty_network();
|
||||
// Simulate invalid block, removing it from processing cache
|
||||
r.simulate_block_gossip_processing_becomes_invalid(block_root);
|
||||
// Should download block, then issue blobs request
|
||||
r.complete_lookup_block_download(block);
|
||||
let id = r.expect_blob_lookup_request(block_root);
|
||||
// Should not trigger block or blob request
|
||||
r.expect_empty_network();
|
||||
r.complete_lookup_block_import_valid(block_root, false);
|
||||
// Resolve blob and expect lookup completed
|
||||
r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true);
|
||||
@@ -1475,11 +1478,14 @@ fn block_in_processing_cache_becomes_valid_imported() {
|
||||
let peer_id = r.new_connected_peer();
|
||||
r.insert_block_to_processing_cache(block.clone().into());
|
||||
r.trigger_unknown_block_from_attestation(block_root, peer_id);
|
||||
// Should trigger blob request
|
||||
let id = r.expect_blob_lookup_request(block_root);
|
||||
// Should not trigger block request
|
||||
r.expect_empty_network();
|
||||
// Resolve the block from processing step
|
||||
r.simulate_block_gossip_processing_becomes_valid_missing_components(block.into());
|
||||
let id = r.expect_blob_lookup_request(block_root);
|
||||
// Should not trigger block or blob request
|
||||
r.expect_empty_network();
|
||||
// Resolve blob and expect lookup completed
|
||||
r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true);
|
||||
r.expect_no_active_lookups();
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::status::ToStatusMessage;
|
||||
use crate::sync::block_lookups::SingleLookupId;
|
||||
use crate::sync::manager::{BlockProcessType, SingleLookupReqId};
|
||||
use beacon_chain::block_verification_types::RpcBlock;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState};
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
|
||||
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
|
||||
@@ -337,26 +337,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
peer_id: PeerId,
|
||||
block_root: Hash256,
|
||||
) -> Result<LookupRequestResult, RpcRequestSendError> {
|
||||
// da_checker includes block that are execution verified, but are missing components
|
||||
if self
|
||||
.chain
|
||||
.data_availability_checker
|
||||
.has_execution_valid_block(&block_root)
|
||||
{
|
||||
return Ok(LookupRequestResult::NoRequestNeeded);
|
||||
}
|
||||
|
||||
// reqresp_pre_import_cache includes blocks that may not be yet execution verified
|
||||
if self
|
||||
.chain
|
||||
.reqresp_pre_import_cache
|
||||
.read()
|
||||
.contains_key(&block_root)
|
||||
{
|
||||
// A block is on the `reqresp_pre_import_cache` but NOT in the
|
||||
// `data_availability_checker` only if it is actively processing. We can expect a future
|
||||
// event with the result of processing
|
||||
return Ok(LookupRequestResult::Pending);
|
||||
match self.chain.get_block_process_status(&block_root) {
|
||||
// Unknown block, continue request to download
|
||||
BlockProcessStatus::Unknown => {}
|
||||
// Block is known are currently processing, expect a future event with the result of
|
||||
// processing.
|
||||
BlockProcessStatus::NotValidated { .. } => return Ok(LookupRequestResult::Pending),
|
||||
// Block is fully validated. If it's not yet imported it's waiting for missing block
|
||||
// components. Consider this request completed and do nothing.
|
||||
BlockProcessStatus::ExecutionValidated { .. } => {
|
||||
return Ok(LookupRequestResult::NoRequestNeeded)
|
||||
}
|
||||
}
|
||||
|
||||
let req_id = self.next_id();
|
||||
@@ -401,9 +392,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
downloaded_block_expected_blobs: Option<usize>,
|
||||
) -> Result<LookupRequestResult, RpcRequestSendError> {
|
||||
let Some(expected_blobs) = downloaded_block_expected_blobs.or_else(|| {
|
||||
self.chain
|
||||
.data_availability_checker
|
||||
.num_expected_blobs(&block_root)
|
||||
// If the block is already being processed or fully validated, retrieve how many blobs
|
||||
// it expects. Consider any stage of the block. If the block root has been validated, we
|
||||
// can assert that this is the correct value of `blob_kzg_commitments_count`.
|
||||
match self.chain.get_block_process_status(&block_root) {
|
||||
BlockProcessStatus::Unknown => None,
|
||||
BlockProcessStatus::NotValidated(block)
|
||||
| BlockProcessStatus::ExecutionValidated(block) => Some(block.num_expected_blobs()),
|
||||
}
|
||||
}) else {
|
||||
// Wait to download the block before downloading blobs. Then we can be sure that the
|
||||
// block has data, so there's no need to do "blind" requests for all possible blobs and
|
||||
|
||||
Reference in New Issue
Block a user