check the da cache and the attester cache in responding to RPC requests

This commit is contained in:
realbigsean
2024-01-29 15:52:42 -05:00
parent c7e5dd1098
commit 66b911fd80
5 changed files with 48 additions and 28 deletions

View File

@@ -19,7 +19,7 @@ use types::{
}; };
#[derive(PartialEq)] #[derive(PartialEq)]
pub enum CheckEarlyAttesterCache { pub enum CheckCaches {
Yes, Yes,
No, No,
} }
@@ -385,14 +385,14 @@ impl<E: EthSpec> EngineRequest<E> {
pub struct BeaconBlockStreamer<T: BeaconChainTypes> { pub struct BeaconBlockStreamer<T: BeaconChainTypes> {
execution_layer: ExecutionLayer<T::EthSpec>, execution_layer: ExecutionLayer<T::EthSpec>,
check_early_attester_cache: CheckEarlyAttesterCache, check_caches: CheckCaches,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
} }
impl<T: BeaconChainTypes> BeaconBlockStreamer<T> { impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
pub fn new( pub fn new(
beacon_chain: &Arc<BeaconChain<T>>, beacon_chain: &Arc<BeaconChain<T>>,
check_early_attester_cache: CheckEarlyAttesterCache, check_caches: CheckCaches,
) -> Result<Self, BeaconChainError> { ) -> Result<Self, BeaconChainError> {
let execution_layer = beacon_chain let execution_layer = beacon_chain
.execution_layer .execution_layer
@@ -402,17 +402,17 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
Ok(Self { Ok(Self {
execution_layer, execution_layer,
check_early_attester_cache, check_caches,
beacon_chain: beacon_chain.clone(), beacon_chain: beacon_chain.clone(),
}) })
} }
fn check_early_attester_cache( fn check_caches(&self, root: Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
&self, if self.check_caches == CheckCaches::Yes {
root: Hash256, self.beacon_chain
) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> { .data_availability_checker
if self.check_early_attester_cache == CheckEarlyAttesterCache::Yes { .get_block(&root)
self.beacon_chain.early_attester_cache.get_block(root) .or(self.beacon_chain.early_attester_cache.get_block(root))
} else { } else {
None None
} }
@@ -422,10 +422,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
let mut db_blocks = Vec::new(); let mut db_blocks = Vec::new();
for root in block_roots { for root in block_roots {
if let Some(cached_block) = self if let Some(cached_block) = self.check_caches(root).map(LoadedBeaconBlock::Full) {
.check_early_attester_cache(root)
.map(LoadedBeaconBlock::Full)
{
db_blocks.push((root, Ok(Some(cached_block)))); db_blocks.push((root, Ok(Some(cached_block))));
continue; continue;
} }
@@ -554,7 +551,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
"Using slower fallback method of eth_getBlockByHash()" "Using slower fallback method of eth_getBlockByHash()"
); );
for root in block_roots { for root in block_roots {
let cached_block = self.check_early_attester_cache(root); let cached_block = self.check_caches(root);
let block_result = if cached_block.is_some() { let block_result = if cached_block.is_some() {
Ok(cached_block) Ok(cached_block)
} else { } else {
@@ -682,7 +679,7 @@ impl From<Error> for BeaconChainError {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache}; use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches};
use crate::test_utils::{test_spec, BeaconChainHarness, EphemeralHarnessType}; use crate::test_utils::{test_spec, BeaconChainHarness, EphemeralHarnessType};
use execution_layer::test_utils::{Block, DEFAULT_ENGINE_CAPABILITIES}; use execution_layer::test_utils::{Block, DEFAULT_ENGINE_CAPABILITIES};
use execution_layer::EngineCapabilities; use execution_layer::EngineCapabilities;
@@ -804,7 +801,7 @@ mod tests {
let start = epoch * slots_per_epoch; let start = epoch * slots_per_epoch;
let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch]; let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch];
epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]); epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]);
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No) let streamer = BeaconBlockStreamer::new(&harness.chain, CheckCaches::No)
.expect("should create streamer"); .expect("should create streamer");
let (block_tx, mut block_rx) = mpsc::unbounded_channel(); let (block_tx, mut block_rx) = mpsc::unbounded_channel();
streamer.stream(epoch_roots.clone(), block_tx).await; streamer.stream(epoch_roots.clone(), block_tx).await;
@@ -945,7 +942,7 @@ mod tests {
let start = epoch * slots_per_epoch; let start = epoch * slots_per_epoch;
let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch]; let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch];
epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]); epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]);
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No) let streamer = BeaconBlockStreamer::new(&harness.chain, CheckCaches::No)
.expect("should create streamer"); .expect("should create streamer");
let (block_tx, mut block_rx) = mpsc::unbounded_channel(); let (block_tx, mut block_rx) = mpsc::unbounded_channel();
streamer.stream(epoch_roots.clone(), block_tx).await; streamer.stream(epoch_roots.clone(), block_tx).await;

View File

@@ -4,7 +4,7 @@ use crate::attestation_verification::{
VerifiedUnaggregatedAttestation, VerifiedUnaggregatedAttestation,
}; };
use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache}; use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache; use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
@@ -1141,10 +1141,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
>, >,
Error, Error,
> { > {
Ok( Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::Yes)?
BeaconBlockStreamer::<T>::new(self, CheckEarlyAttesterCache::Yes)? .launch_stream(block_roots, executor))
.launch_stream(block_roots, executor),
)
} }
pub fn get_blocks( pub fn get_blocks(
@@ -1160,10 +1158,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
>, >,
Error, Error,
> { > {
Ok( Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::No)?
BeaconBlockStreamer::<T>::new(self, CheckEarlyAttesterCache::No)? .launch_stream(block_roots, executor))
.launch_stream(block_roots, executor),
)
} }
pub fn get_blobs_checking_early_attester_cache( pub fn get_blobs_checking_early_attester_cache(

View File

@@ -192,6 +192,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.peek_blob(blob_id) self.availability_cache.peek_blob(blob_id)
} }
/// Get a block from the availability cache. Only checks for blocks stored in memory. Useful
/// for serving RPC requests.
pub fn get_block(&self, block_root: &Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
self.availability_cache.peek_block(block_root)
}
/// Put a list of blobs received via RPC into the availability cache. This performs KZG /// Put a list of blobs received via RPC into the availability cache. This performs KZG
/// verification on the blobs in the list. /// verification on the blobs in the list.
pub fn put_rpc_blobs( pub fn put_rpc_blobs(

View File

@@ -45,7 +45,7 @@ use ssz_types::{FixedVector, VariableList};
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use std::{collections::HashSet, sync::Arc}; use std::{collections::HashSet, sync::Arc};
use types::blob_sidecar::BlobIdentifier; use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256}; use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};
/// This represents the components of a partially available block /// This represents the components of a partially available block
/// ///
@@ -322,6 +322,18 @@ impl<T: BeaconChainTypes> Critical<T> {
} }
} }
/// This only checks for the blocks in memory
pub fn peek_block(&self, block_root: &Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
self.in_memory
.peek(block_root)
.and_then(|pending_components| {
pending_components
.executed_block
.as_ref()
.map(|block| block.block_cloned())
})
}
/// Puts the pending components in the LRU cache. If the cache /// Puts the pending components in the LRU cache. If the cache
/// is at capacity, the LRU entry is written to the store first /// is at capacity, the LRU entry is written to the store first
pub fn put_pending_components( pub fn put_pending_components(
@@ -399,6 +411,11 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
}) })
} }
/// Just checks for blocks stored in memory
pub fn peek_block(&self, block_root: &Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
self.critical.read().peek_block(block_root)
}
/// Fetch a blob from the cache without affecting the LRU ordering /// Fetch a blob from the cache without affecting the LRU ordering
pub fn peek_blob( pub fn peek_blob(
&self, &self,

View File

@@ -35,6 +35,10 @@ impl<E: EthSpec> DietAvailabilityPendingExecutedBlock<E> {
&self.block &self.block
} }
pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
self.block.clone()
}
pub fn num_blobs_expected(&self) -> usize { pub fn num_blobs_expected(&self) -> usize {
self.block self.block
.message() .message()