Merge branch 'eip4844' into deneb-free-blobs

This commit is contained in:
Diva M
2023-03-24 14:38:29 -05:00
63 changed files with 1852 additions and 536 deletions

View File

@@ -41,6 +41,7 @@ state_processing = { path = "../../consensus/state_processing" }
tree_hash = "0.4.1"
types = { path = "../../consensus/types" }
tokio = "1.14.0"
tokio-stream = "0.1.3"
eth1 = { path = "../eth1" }
futures = "0.3.7"
genesis = { path = "../genesis" }

View File

@@ -0,0 +1,974 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1};
use slog::{crit, debug, Logger};
use std::collections::HashMap;
use std::sync::Arc;
use store::{DatabaseBlock, ExecutionPayloadEip4844};
use task_executor::TaskExecutor;
use tokio::sync::{
mpsc::{self, UnboundedSender},
RwLock,
};
use tokio_stream::{wrappers::UnboundedReceiverStream, Stream};
use types::{
ChainSpec, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, Hash256, SignedBeaconBlock,
SignedBlindedBeaconBlock, Slot,
};
use types::{
ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadHeader, ExecutionPayloadMerge,
};
#[derive(PartialEq)]
pub enum CheckEarlyAttesterCache {
Yes,
No,
}
#[derive(Debug)]
pub enum Error {
PayloadReconstruction(String),
BlocksByRangeFailure(Box<execution_layer::Error>),
RequestNotFound,
BlockResultNotFound,
}
const BLOCKS_PER_RANGE_REQUEST: u64 = 32;
// This is the same as a DatabaseBlock but the Arc allows us to avoid an unnecessary clone.
enum LoadedBeaconBlock<E: EthSpec> {
Full(Arc<SignedBeaconBlock<E>>),
Blinded(Box<SignedBlindedBeaconBlock<E>>),
}
type LoadResult<E> = Result<Option<LoadedBeaconBlock<E>>, BeaconChainError>;
type BlockResult<E> = Result<Option<Arc<SignedBeaconBlock<E>>>, BeaconChainError>;
enum RequestState<E: EthSpec> {
UnSent(Vec<BlockParts<E>>),
Sent(HashMap<Hash256, Arc<BlockResult<E>>>),
}
struct BodiesByRange<E: EthSpec> {
start: u64,
count: u64,
state: RequestState<E>,
}
// stores the components of a block for future re-construction in a small form
struct BlockParts<E: EthSpec> {
blinded_block: Box<SignedBlindedBeaconBlock<E>>,
header: Box<ExecutionPayloadHeader<E>>,
body: Option<Box<ExecutionPayloadBodyV1<E>>>,
}
impl<E: EthSpec> BlockParts<E> {
pub fn new(
blinded: Box<SignedBlindedBeaconBlock<E>>,
header: ExecutionPayloadHeader<E>,
) -> Self {
Self {
blinded_block: blinded,
header: Box::new(header),
body: None,
}
}
pub fn root(&self) -> Hash256 {
self.blinded_block.canonical_root()
}
pub fn slot(&self) -> Slot {
self.blinded_block.message().slot()
}
pub fn block_hash(&self) -> ExecutionBlockHash {
self.header.block_hash()
}
}
fn reconstruct_default_header_block<E: EthSpec>(
blinded_block: Box<SignedBlindedBeaconBlock<E>>,
header_from_block: ExecutionPayloadHeader<E>,
spec: &ChainSpec,
) -> BlockResult<E> {
let fork = blinded_block
.fork_name(spec)
.map_err(BeaconChainError::InconsistentFork)?;
let payload: ExecutionPayload<E> = match fork {
ForkName::Merge => ExecutionPayloadMerge::default().into(),
ForkName::Capella => ExecutionPayloadCapella::default().into(),
ForkName::Eip4844 => ExecutionPayloadEip4844::default().into(),
ForkName::Base | ForkName::Altair => {
return Err(Error::PayloadReconstruction(format!(
"Block with fork variant {} has execution payload",
fork
))
.into())
}
};
let header_from_payload = ExecutionPayloadHeader::from(payload.to_ref());
if header_from_payload == header_from_block {
blinded_block
.try_into_full_block(Some(payload))
.ok_or(BeaconChainError::AddPayloadLogicError)
.map(Arc::new)
.map(Some)
} else {
Err(BeaconChainError::InconsistentPayloadReconstructed {
slot: blinded_block.slot(),
exec_block_hash: header_from_block.block_hash(),
canonical_transactions_root: header_from_block.transactions_root(),
reconstructed_transactions_root: header_from_payload.transactions_root(),
})
}
}
fn reconstruct_blocks<E: EthSpec>(
block_map: &mut HashMap<Hash256, Arc<BlockResult<E>>>,
block_parts_with_bodies: HashMap<Hash256, BlockParts<E>>,
log: &Logger,
) {
for (root, block_parts) in block_parts_with_bodies {
if let Some(payload_body) = block_parts.body {
match payload_body.to_payload(block_parts.header.as_ref().clone()) {
Ok(payload) => {
let header_from_payload = ExecutionPayloadHeader::from(payload.to_ref());
if header_from_payload == *block_parts.header {
block_map.insert(
root,
Arc::new(
block_parts
.blinded_block
.try_into_full_block(Some(payload))
.ok_or(BeaconChainError::AddPayloadLogicError)
.map(Arc::new)
.map(Some),
),
);
} else {
let error = BeaconChainError::InconsistentPayloadReconstructed {
slot: block_parts.blinded_block.slot(),
exec_block_hash: block_parts.header.block_hash(),
canonical_transactions_root: block_parts.header.transactions_root(),
reconstructed_transactions_root: header_from_payload
.transactions_root(),
};
debug!(log, "Failed to reconstruct block"; "root" => ?root, "error" => ?error);
block_map.insert(root, Arc::new(Err(error)));
}
}
Err(string) => {
block_map.insert(
root,
Arc::new(Err(Error::PayloadReconstruction(string).into())),
);
}
}
} else {
block_map.insert(
root,
Arc::new(Err(BeaconChainError::BlockHashMissingFromExecutionLayer(
block_parts.block_hash(),
))),
);
}
}
}
impl<E: EthSpec> BodiesByRange<E> {
pub fn new(maybe_block_parts: Option<BlockParts<E>>) -> Self {
if let Some(block_parts) = maybe_block_parts {
Self {
start: block_parts.header.block_number(),
count: 1,
state: RequestState::UnSent(vec![block_parts]),
}
} else {
Self {
start: 0,
count: 0,
state: RequestState::UnSent(vec![]),
}
}
}
pub fn is_unsent(&self) -> bool {
matches!(self.state, RequestState::UnSent(_))
}
pub fn push_block_parts(&mut self, block_parts: BlockParts<E>) -> Result<(), BlockParts<E>> {
if self.count == BLOCKS_PER_RANGE_REQUEST {
return Err(block_parts);
}
match &mut self.state {
RequestState::Sent(_) => Err(block_parts),
RequestState::UnSent(blocks_parts_vec) => {
let block_number = block_parts.header.block_number();
if self.count == 0 {
self.start = block_number;
self.count = 1;
blocks_parts_vec.push(block_parts);
Ok(())
} else {
// need to figure out if this block fits in the request
if block_number < self.start
|| self.start + BLOCKS_PER_RANGE_REQUEST <= block_number
{
return Err(block_parts);
}
blocks_parts_vec.push(block_parts);
if self.start + self.count <= block_number {
self.count = block_number - self.start + 1;
}
Ok(())
}
}
}
}
async fn execute(&mut self, execution_layer: &ExecutionLayer<E>, log: &Logger) {
if let RequestState::UnSent(blocks_parts_ref) = &mut self.state {
let block_parts_vec = std::mem::take(blocks_parts_ref);
let mut block_map = HashMap::new();
match execution_layer
.get_payload_bodies_by_range(self.start, self.count)
.await
{
Ok(bodies) => {
let mut range_map = (self.start..(self.start + self.count))
.zip(bodies.into_iter().chain(std::iter::repeat(None)))
.collect::<HashMap<_, _>>();
let mut with_bodies = HashMap::new();
for mut block_parts in block_parts_vec {
with_bodies
// it's possible the same block is requested twice, using
// or_insert_with() skips duplicates
.entry(block_parts.root())
.or_insert_with(|| {
let block_number = block_parts.header.block_number();
block_parts.body =
range_map.remove(&block_number).flatten().map(Box::new);
block_parts
});
}
reconstruct_blocks(&mut block_map, with_bodies, log);
}
Err(e) => {
let block_result =
Arc::new(Err(Error::BlocksByRangeFailure(Box::new(e)).into()));
debug!(log, "Payload bodies by range failure"; "error" => ?block_result);
for block_parts in block_parts_vec {
block_map.insert(block_parts.root(), block_result.clone());
}
}
}
self.state = RequestState::Sent(block_map);
}
}
pub async fn get_block_result(
&mut self,
root: &Hash256,
execution_layer: &ExecutionLayer<E>,
log: &Logger,
) -> Option<Arc<BlockResult<E>>> {
self.execute(execution_layer, log).await;
if let RequestState::Sent(map) = &self.state {
return map.get(root).cloned();
}
// Shouldn't reach this point
None
}
}
#[derive(Clone)]
enum EngineRequest<E: EthSpec> {
ByRange(Arc<RwLock<BodiesByRange<E>>>),
// When we already have the data or there's an error
NoRequest(Arc<RwLock<HashMap<Hash256, Arc<BlockResult<E>>>>>),
}
impl<E: EthSpec> EngineRequest<E> {
pub fn new_by_range() -> Self {
Self::ByRange(Arc::new(RwLock::new(BodiesByRange::new(None))))
}
pub fn new_no_request() -> Self {
Self::NoRequest(Arc::new(RwLock::new(HashMap::new())))
}
pub async fn is_unsent(&self) -> bool {
match self {
Self::ByRange(bodies_by_range) => bodies_by_range.read().await.is_unsent(),
Self::NoRequest(_) => false,
}
}
pub async fn push_block_parts(&mut self, block_parts: BlockParts<E>, log: &Logger) {
match self {
Self::ByRange(bodies_by_range) => {
let mut request = bodies_by_range.write().await;
if let Err(block_parts) = request.push_block_parts(block_parts) {
drop(request);
let new_by_range = BodiesByRange::new(Some(block_parts));
*self = Self::ByRange(Arc::new(RwLock::new(new_by_range)));
}
}
Self::NoRequest(_) => {
// this should _never_ happen
crit!(
log,
"Please notify the devs";
"beacon_block_streamer" => "push_block_parts called on NoRequest Variant",
);
}
}
}
pub async fn push_block_result(
&mut self,
root: Hash256,
block_result: BlockResult<E>,
log: &Logger,
) {
// this function will only fail if something is seriously wrong
match self {
Self::ByRange(_) => {
// this should _never_ happen
crit!(
log,
"Please notify the devs";
"beacon_block_streamer" => "push_block_result called on ByRange",
);
}
Self::NoRequest(results) => {
results.write().await.insert(root, Arc::new(block_result));
}
}
}
pub async fn get_block_result(
&self,
root: &Hash256,
execution_layer: &ExecutionLayer<E>,
log: &Logger,
) -> Arc<BlockResult<E>> {
match self {
Self::ByRange(by_range) => {
by_range
.write()
.await
.get_block_result(root, execution_layer, log)
.await
}
Self::NoRequest(map) => map.read().await.get(root).cloned(),
}
.unwrap_or_else(|| {
crit!(
log,
"Please notify the devs";
"beacon_block_streamer" => "block_result not found in request",
"root" => ?root,
);
Arc::new(Err(Error::BlockResultNotFound.into()))
})
}
}
pub struct BeaconBlockStreamer<T: BeaconChainTypes> {
execution_layer: ExecutionLayer<T::EthSpec>,
check_early_attester_cache: CheckEarlyAttesterCache,
beacon_chain: Arc<BeaconChain<T>>,
}
impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
pub fn new(
beacon_chain: &Arc<BeaconChain<T>>,
check_early_attester_cache: CheckEarlyAttesterCache,
) -> Result<Self, BeaconChainError> {
let execution_layer = beacon_chain
.execution_layer
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)?
.clone();
Ok(Self {
execution_layer,
check_early_attester_cache,
beacon_chain: beacon_chain.clone(),
})
}
fn check_early_attester_cache(
&self,
root: Hash256,
) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
if self.check_early_attester_cache == CheckEarlyAttesterCache::Yes {
self.beacon_chain.early_attester_cache.get_block(root)
} else {
None
}
}
fn load_payloads(&self, block_roots: Vec<Hash256>) -> Vec<(Hash256, LoadResult<T::EthSpec>)> {
let mut db_blocks = Vec::new();
for root in block_roots {
if let Some(cached_block) = self
.check_early_attester_cache(root)
.map(LoadedBeaconBlock::Full)
{
db_blocks.push((root, Ok(Some(cached_block))));
continue;
}
match self.beacon_chain.store.try_get_full_block(&root) {
Err(e) => db_blocks.push((root, Err(e.into()))),
Ok(opt_block) => db_blocks.push((
root,
Ok(opt_block.map(|db_block| match db_block {
DatabaseBlock::Full(block) => LoadedBeaconBlock::Full(Arc::new(block)),
DatabaseBlock::Blinded(block) => {
LoadedBeaconBlock::Blinded(Box::new(block))
}
})),
)),
}
}
db_blocks
}
/// Pre-process the loaded blocks into execution engine requests.
///
/// The purpose of this function is to separate the blocks into 2 categories:
/// 1) no_request - when we already have the full block or there's an error
/// 2) blocks_by_range - used for blinded blocks
///
/// The function returns a vector of block roots in the same order as requested
/// along with the engine request that each root corresponds to.
async fn get_requests(
&self,
payloads: Vec<(Hash256, LoadResult<T::EthSpec>)>,
) -> Vec<(Hash256, EngineRequest<T::EthSpec>)> {
let mut ordered_block_roots = Vec::new();
let mut requests = HashMap::new();
// we sort the by range blocks by slot before adding them to the
// request as it should *better* optimize the number of blocks that
// can fit in the same request
let mut by_range_blocks: Vec<BlockParts<T::EthSpec>> = vec![];
let mut no_request = EngineRequest::new_no_request();
for (root, load_result) in payloads {
// preserve the order of the requested blocks
ordered_block_roots.push(root);
let block_result = match load_result {
Err(e) => Err(e),
Ok(None) => Ok(None),
Ok(Some(LoadedBeaconBlock::Full(full_block))) => Ok(Some(full_block)),
Ok(Some(LoadedBeaconBlock::Blinded(blinded_block))) => {
match blinded_block
.message()
.execution_payload()
.map(|payload| payload.to_execution_payload_header())
{
Ok(header) => {
if header.block_hash() == ExecutionBlockHash::zero() {
reconstruct_default_header_block(
blinded_block,
header,
&self.beacon_chain.spec,
)
} else {
// Add the block to the set requiring a by-range request.
let block_parts = BlockParts::new(blinded_block, header);
by_range_blocks.push(block_parts);
continue;
}
}
Err(e) => Err(BeaconChainError::BeaconStateError(e)),
}
}
};
no_request
.push_block_result(root, block_result, &self.beacon_chain.log)
.await;
requests.insert(root, no_request.clone());
}
// Now deal with the by_range requests. Sort them in order of increasing slot
let mut by_range = EngineRequest::<T::EthSpec>::new_by_range();
by_range_blocks.sort_by_key(|block_parts| block_parts.slot());
for block_parts in by_range_blocks {
let root = block_parts.root();
by_range
.push_block_parts(block_parts, &self.beacon_chain.log)
.await;
requests.insert(root, by_range.clone());
}
let mut result = vec![];
for root in ordered_block_roots {
if let Some(request) = requests.get(&root) {
result.push((root, request.clone()))
} else {
crit!(
self.beacon_chain.log,
"Please notify the devs";
"beacon_block_streamer" => "request not found",
"root" => ?root,
);
no_request
.push_block_result(
root,
Err(Error::RequestNotFound.into()),
&self.beacon_chain.log,
)
.await;
result.push((root, no_request.clone()));
}
}
result
}
// used when the execution engine doesn't support the payload bodies methods
async fn stream_blocks_fallback(
&self,
block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
) {
debug!(
self.beacon_chain.log,
"Using slower fallback method of eth_getBlockByHash()"
);
for root in block_roots {
let cached_block = self.check_early_attester_cache(root);
let block_result = if cached_block.is_some() {
Ok(cached_block)
} else {
self.beacon_chain
.get_block(&root)
.await
.map(|opt_block| opt_block.map(Arc::new))
};
if sender.send((root, Arc::new(block_result))).is_err() {
break;
}
}
}
async fn stream_blocks(
&self,
block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
) {
let n_roots = block_roots.len();
let mut n_success = 0usize;
let mut n_sent = 0usize;
let mut engine_requests = 0usize;
let payloads = self.load_payloads(block_roots);
let requests = self.get_requests(payloads).await;
for (root, request) in requests {
if request.is_unsent().await {
engine_requests += 1;
}
let result = request
.get_block_result(&root, &self.execution_layer, &self.beacon_chain.log)
.await;
let successful = result
.as_ref()
.as_ref()
.map(|opt| opt.is_some())
.unwrap_or(false);
if sender.send((root, result)).is_err() {
break;
} else {
n_sent += 1;
if successful {
n_success += 1;
}
}
}
debug!(
self.beacon_chain.log,
"BeaconBlockStreamer finished";
"requested blocks" => n_roots,
"sent" => n_sent,
"succeeded" => n_success,
"failed" => (n_sent - n_success),
"engine requests" => engine_requests,
);
}
pub async fn stream(
self,
block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
) {
match self
.execution_layer
.get_engine_capabilities(None)
.await
.map_err(Box::new)
.map_err(BeaconChainError::EngineGetCapabilititesFailed)
{
Ok(engine_capabilities) => {
if engine_capabilities.get_payload_bodies_by_range_v1 {
self.stream_blocks(block_roots, sender).await;
} else {
// use the fallback method
self.stream_blocks_fallback(block_roots, sender).await;
}
}
Err(e) => {
send_errors(block_roots, sender, e).await;
}
}
}
pub fn launch_stream(
self,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
) -> impl Stream<Item = (Hash256, Arc<BlockResult<T::EthSpec>>)> {
let (block_tx, block_rx) = mpsc::unbounded_channel();
debug!(
self.beacon_chain.log,
"Launching a BeaconBlockStreamer";
"blocks" => block_roots.len(),
);
executor.spawn(self.stream(block_roots, block_tx), "get_blocks_sender");
UnboundedReceiverStream::new(block_rx)
}
}
async fn send_errors<E: EthSpec>(
block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<BlockResult<E>>)>,
beacon_chain_error: BeaconChainError,
) {
let result = Arc::new(Err(beacon_chain_error));
for root in block_roots {
if sender.send((root, result.clone())).is_err() {
break;
}
}
}
impl From<Error> for BeaconChainError {
fn from(value: Error) -> Self {
BeaconChainError::BlockStreamerError(value)
}
}
#[cfg(test)]
mod tests {
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache};
use crate::test_utils::{test_spec, BeaconChainHarness, EphemeralHarnessType};
use execution_layer::test_utils::{Block, DEFAULT_ENGINE_CAPABILITIES};
use execution_layer::EngineCapabilities;
use lazy_static::lazy_static;
use std::time::Duration;
use tokio::sync::mpsc;
use types::{ChainSpec, Epoch, EthSpec, Hash256, Keypair, MinimalEthSpec, Slot};
const VALIDATOR_COUNT: usize = 48;
lazy_static! {
/// A cached set of keys.
static ref KEYPAIRS: Vec<Keypair> = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT);
}
fn get_harness(
validator_count: usize,
spec: ChainSpec,
) -> BeaconChainHarness<EphemeralHarnessType<MinimalEthSpec>> {
let harness = BeaconChainHarness::builder(MinimalEthSpec)
.spec(spec)
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.logger(logging::test_logger())
.fresh_ephemeral_store()
.mock_execution_layer()
.build();
harness.advance_slot();
harness
}
#[tokio::test]
async fn check_all_blocks_from_altair_to_capella() {
let slots_per_epoch = MinimalEthSpec::slots_per_epoch() as usize;
let num_epochs = 8;
let bellatrix_fork_epoch = 2usize;
let capella_fork_epoch = 4usize;
let num_blocks_produced = num_epochs * slots_per_epoch;
let mut spec = test_spec::<MinimalEthSpec>();
spec.altair_fork_epoch = Some(Epoch::new(0));
spec.bellatrix_fork_epoch = Some(Epoch::new(bellatrix_fork_epoch as u64));
spec.capella_fork_epoch = Some(Epoch::new(capella_fork_epoch as u64));
let harness = get_harness(VALIDATOR_COUNT, spec);
// go to bellatrix fork
harness
.extend_slots(bellatrix_fork_epoch * slots_per_epoch)
.await;
// extend half an epoch
harness.extend_slots(slots_per_epoch / 2).await;
// trigger merge
harness
.execution_block_generator()
.move_to_terminal_block()
.expect("should move to terminal block");
let timestamp = harness.get_timestamp_at_slot() + harness.spec.seconds_per_slot;
harness
.execution_block_generator()
.modify_last_block(|block| {
if let Block::PoW(terminal_block) = block {
terminal_block.timestamp = timestamp;
}
});
// finish out merge epoch
harness.extend_slots(slots_per_epoch / 2).await;
// finish rest of epochs
harness
.extend_slots((num_epochs - 1 - bellatrix_fork_epoch) * slots_per_epoch)
.await;
let head = harness.chain.head_snapshot();
let state = &head.beacon_state;
assert_eq!(
state.slot(),
Slot::new(num_blocks_produced as u64),
"head should be at the current slot"
);
assert_eq!(
state.current_epoch(),
num_blocks_produced as u64 / MinimalEthSpec::slots_per_epoch(),
"head should be at the expected epoch"
);
assert_eq!(
state.current_justified_checkpoint().epoch,
state.current_epoch() - 1,
"the head should be justified one behind the current epoch"
);
assert_eq!(
state.finalized_checkpoint().epoch,
state.current_epoch() - 2,
"the head should be finalized two behind the current epoch"
);
let block_roots: Vec<Hash256> = harness
.chain
.forwards_iter_block_roots(Slot::new(0))
.expect("should get iter")
.map(Result::unwrap)
.map(|(root, _)| root)
.collect();
let mut expected_blocks = vec![];
// get all blocks the old fashioned way
for root in &block_roots {
let block = harness
.chain
.get_block(root)
.await
.expect("should get block")
.expect("block should exist");
expected_blocks.push(block);
}
for epoch in 0..num_epochs {
let start = epoch * slots_per_epoch;
let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch];
epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]);
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No)
.expect("should create streamer");
let (block_tx, mut block_rx) = mpsc::unbounded_channel();
streamer.stream(epoch_roots.clone(), block_tx).await;
for (i, expected_root) in epoch_roots.into_iter().enumerate() {
let (found_root, found_block_result) =
block_rx.recv().await.expect("should get block");
assert_eq!(
found_root, expected_root,
"expected block root should match"
);
match found_block_result.as_ref() {
Ok(maybe_block) => {
let found_block = maybe_block.clone().expect("should have a block");
let expected_block = expected_blocks
.get(start + i)
.expect("should get expected block");
assert_eq!(
found_block.as_ref(),
expected_block,
"expected block should match found block"
);
}
Err(e) => panic!("Error retrieving block {}: {:?}", expected_root, e),
}
}
}
}
#[tokio::test]
async fn check_fallback_altair_to_capella() {
let slots_per_epoch = MinimalEthSpec::slots_per_epoch() as usize;
let num_epochs = 8;
let bellatrix_fork_epoch = 2usize;
let capella_fork_epoch = 4usize;
let num_blocks_produced = num_epochs * slots_per_epoch;
let mut spec = test_spec::<MinimalEthSpec>();
spec.altair_fork_epoch = Some(Epoch::new(0));
spec.bellatrix_fork_epoch = Some(Epoch::new(bellatrix_fork_epoch as u64));
spec.capella_fork_epoch = Some(Epoch::new(capella_fork_epoch as u64));
let harness = get_harness(VALIDATOR_COUNT, spec);
// modify execution engine so it doesn't support engine_payloadBodiesBy* methods
let mock_execution_layer = harness.mock_execution_layer.as_ref().unwrap();
mock_execution_layer
.server
.set_engine_capabilities(EngineCapabilities {
get_payload_bodies_by_hash_v1: false,
get_payload_bodies_by_range_v1: false,
..DEFAULT_ENGINE_CAPABILITIES
});
// refresh capabilities cache
harness
.chain
.execution_layer
.as_ref()
.unwrap()
.get_engine_capabilities(Some(Duration::ZERO))
.await
.unwrap();
// go to bellatrix fork
harness
.extend_slots(bellatrix_fork_epoch * slots_per_epoch)
.await;
// extend half an epoch
harness.extend_slots(slots_per_epoch / 2).await;
// trigger merge
harness
.execution_block_generator()
.move_to_terminal_block()
.expect("should move to terminal block");
let timestamp = harness.get_timestamp_at_slot() + harness.spec.seconds_per_slot;
harness
.execution_block_generator()
.modify_last_block(|block| {
if let Block::PoW(terminal_block) = block {
terminal_block.timestamp = timestamp;
}
});
// finish out merge epoch
harness.extend_slots(slots_per_epoch / 2).await;
// finish rest of epochs
harness
.extend_slots((num_epochs - 1 - bellatrix_fork_epoch) * slots_per_epoch)
.await;
let head = harness.chain.head_snapshot();
let state = &head.beacon_state;
assert_eq!(
state.slot(),
Slot::new(num_blocks_produced as u64),
"head should be at the current slot"
);
assert_eq!(
state.current_epoch(),
num_blocks_produced as u64 / MinimalEthSpec::slots_per_epoch(),
"head should be at the expected epoch"
);
assert_eq!(
state.current_justified_checkpoint().epoch,
state.current_epoch() - 1,
"the head should be justified one behind the current epoch"
);
assert_eq!(
state.finalized_checkpoint().epoch,
state.current_epoch() - 2,
"the head should be finalized two behind the current epoch"
);
let block_roots: Vec<Hash256> = harness
.chain
.forwards_iter_block_roots(Slot::new(0))
.expect("should get iter")
.map(Result::unwrap)
.map(|(root, _)| root)
.collect();
let mut expected_blocks = vec![];
// get all blocks the old fashioned way
for root in &block_roots {
let block = harness
.chain
.get_block(root)
.await
.expect("should get block")
.expect("block should exist");
expected_blocks.push(block);
}
for epoch in 0..num_epochs {
let start = epoch * slots_per_epoch;
let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch];
epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]);
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No)
.expect("should create streamer");
let (block_tx, mut block_rx) = mpsc::unbounded_channel();
streamer.stream(epoch_roots.clone(), block_tx).await;
for (i, expected_root) in epoch_roots.into_iter().enumerate() {
let (found_root, found_block_result) =
block_rx.recv().await.expect("should get block");
assert_eq!(
found_root, expected_root,
"expected block root should match"
);
match found_block_result.as_ref() {
Ok(maybe_block) => {
let found_block = maybe_block.clone().expect("should have a block");
let expected_block = expected_blocks
.get(start + i)
.expect("should get expected block");
assert_eq!(
found_block.as_ref(),
expected_block,
"expected block should match found block"
);
}
Err(e) => panic!("Error retrieving block {}: {:?}", expected_root, e),
}
}
}
}
}

View File

@@ -4,6 +4,7 @@ use crate::attestation_verification::{
VerifiedUnaggregatedAttestation,
};
use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_cache::BlobCache;
@@ -76,7 +77,7 @@ use itertools::Itertools;
use kzg::Kzg;
use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella};
use parking_lot::{Mutex, RwLock};
use proto_array::{CountUnrealizedFull, DoNotReOrg, ProposerHeadError};
use proto_array::{DoNotReOrg, ProposerHeadError};
use safe_arith::SafeArith;
use slasher::Slasher;
use slog::{crit, debug, error, info, trace, warn, Logger};
@@ -106,6 +107,7 @@ use store::{
DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::beacon_state::CloneConfig;
use types::blobs_sidecar::KzgCommitments;
@@ -485,7 +487,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn load_fork_choice(
store: BeaconStore<T>,
reset_payload_statuses: ResetPayloadStatuses,
count_unrealized_full: CountUnrealizedFull,
spec: &ChainSpec,
log: &Logger,
) -> Result<Option<BeaconForkChoice<T>>, Error> {
@@ -502,7 +503,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
persisted_fork_choice.fork_choice,
reset_payload_statuses,
fc_store,
count_unrealized_full,
spec,
log,
)?))
@@ -949,14 +949,42 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// ## Errors
///
/// May return a database error.
pub async fn get_block_checking_early_attester_cache(
&self,
block_root: &Hash256,
) -> Result<Option<Arc<SignedBeaconBlock<T::EthSpec>>>, Error> {
if let Some(block) = self.early_attester_cache.get_block(*block_root) {
return Ok(Some(block));
}
Ok(self.get_block(block_root).await?.map(Arc::new))
pub fn get_blocks_checking_early_attester_cache(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
) -> Result<
impl Stream<
Item = (
Hash256,
Arc<Result<Option<Arc<SignedBeaconBlock<T::EthSpec>>>, Error>>,
),
>,
Error,
> {
Ok(
BeaconBlockStreamer::<T>::new(self, CheckEarlyAttesterCache::Yes)?
.launch_stream(block_roots, executor),
)
}
pub fn get_blocks(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
) -> Result<
impl Stream<
Item = (
Hash256,
Arc<Result<Option<Arc<SignedBeaconBlock<T::EthSpec>>>, Error>>,
),
>,
Error,
> {
Ok(
BeaconBlockStreamer::<T>::new(self, CheckEarlyAttesterCache::No)?
.launch_stream(block_roots, executor),
)
}
pub async fn get_blobs_checking_early_attester_cache(
@@ -1944,7 +1972,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.slot()?,
verified.indexed_attestation(),
AttestationFromBlock::False,
&self.spec,
)
.map_err(Into::into)
}
@@ -2911,7 +2938,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&state,
payload_verification_status,
&self.spec,
count_unrealized.and(self.config.count_unrealized.into()),
count_unrealized,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
}
@@ -3054,7 +3081,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ResetPayloadStatuses::always_reset_conditionally(
self.config.always_reset_payload_statuses,
),
self.config.count_unrealized_full,
&self.store,
&self.spec,
&self.log,

View File

@@ -20,6 +20,14 @@ use types::{
Hash256, Slot,
};
/// Ensure this justified checkpoint has an epoch of 0 so that it is never
/// greater than the justified checkpoint and enshrined as the actual justified
/// checkpoint.
const JUNK_BEST_JUSTIFIED_CHECKPOINT: Checkpoint = Checkpoint {
epoch: Epoch::new(0),
root: Hash256::repeat_byte(0),
};
#[derive(Debug)]
pub enum Error {
UnableToReadSlot,
@@ -144,7 +152,6 @@ pub struct BeaconForkChoiceStore<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<
finalized_checkpoint: Checkpoint,
justified_checkpoint: Checkpoint,
justified_balances: JustifiedBalances,
best_justified_checkpoint: Checkpoint,
unrealized_justified_checkpoint: Checkpoint,
unrealized_finalized_checkpoint: Checkpoint,
proposer_boost_root: Hash256,
@@ -194,7 +201,6 @@ where
justified_checkpoint,
justified_balances,
finalized_checkpoint,
best_justified_checkpoint: justified_checkpoint,
unrealized_justified_checkpoint: justified_checkpoint,
unrealized_finalized_checkpoint: finalized_checkpoint,
proposer_boost_root: Hash256::zero(),
@@ -212,7 +218,7 @@ where
finalized_checkpoint: self.finalized_checkpoint,
justified_checkpoint: self.justified_checkpoint,
justified_balances: self.justified_balances.effective_balances.clone(),
best_justified_checkpoint: self.best_justified_checkpoint,
best_justified_checkpoint: JUNK_BEST_JUSTIFIED_CHECKPOINT,
unrealized_justified_checkpoint: self.unrealized_justified_checkpoint,
unrealized_finalized_checkpoint: self.unrealized_finalized_checkpoint,
proposer_boost_root: self.proposer_boost_root,
@@ -234,7 +240,6 @@ where
finalized_checkpoint: persisted.finalized_checkpoint,
justified_checkpoint: persisted.justified_checkpoint,
justified_balances,
best_justified_checkpoint: persisted.best_justified_checkpoint,
unrealized_justified_checkpoint: persisted.unrealized_justified_checkpoint,
unrealized_finalized_checkpoint: persisted.unrealized_finalized_checkpoint,
proposer_boost_root: persisted.proposer_boost_root,
@@ -277,10 +282,6 @@ where
&self.justified_balances
}
fn best_justified_checkpoint(&self) -> &Checkpoint {
&self.best_justified_checkpoint
}
fn finalized_checkpoint(&self) -> &Checkpoint {
&self.finalized_checkpoint
}
@@ -333,10 +334,6 @@ where
Ok(())
}
fn set_best_justified_checkpoint(&mut self, checkpoint: Checkpoint) {
self.best_justified_checkpoint = checkpoint
}
fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint) {
self.unrealized_justified_checkpoint = checkpoint;
}

View File

@@ -1550,7 +1550,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
current_slot,
indexed_attestation,
AttestationFromBlock::True,
&chain.spec,
) {
Ok(()) => Ok(()),
// Ignore invalid attestations whilst importing attestations from a block. The

View File

@@ -19,7 +19,7 @@ use crate::{
};
use eth1::Config as Eth1Config;
use execution_layer::ExecutionLayer;
use fork_choice::{ForkChoice, ResetPayloadStatuses};
use fork_choice::{CountUnrealized, ForkChoice, ResetPayloadStatuses};
use futures::channel::mpsc::Sender;
use kzg::{Kzg, TrustedSetup};
use operation_pool::{OperationPool, PersistedOperationPool};
@@ -269,7 +269,6 @@ where
ResetPayloadStatuses::always_reset_conditionally(
self.chain_config.always_reset_payload_statuses,
),
self.chain_config.count_unrealized_full,
&self.spec,
log,
)
@@ -388,7 +387,6 @@ where
&genesis.beacon_block,
&genesis.beacon_state,
current_slot,
self.chain_config.count_unrealized_full,
&self.spec,
)
.map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?;
@@ -507,7 +505,6 @@ where
&snapshot.beacon_block,
&snapshot.beacon_state,
current_slot,
self.chain_config.count_unrealized_full,
&self.spec,
)
.map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?;
@@ -698,8 +695,7 @@ where
store.clone(),
Some(current_slot),
&self.spec,
self.chain_config.count_unrealized.into(),
self.chain_config.count_unrealized_full,
CountUnrealized::True,
)?;
}
@@ -782,6 +778,7 @@ where
let genesis_time = head_snapshot.beacon_state.genesis_time();
let head_for_snapshot_cache = head_snapshot.clone();
let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot));
let shuffling_cache_size = self.chain_config.shuffling_cache_size;
let beacon_chain = BeaconChain {
spec: self.spec,
@@ -835,7 +832,7 @@ where
DEFAULT_SNAPSHOT_CACHE_SIZE,
head_for_snapshot_cache,
)),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new(shuffling_cache_size)),
eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())),
beacon_proposer_cache: <_>::default(),
block_times_cache: <_>::default(),

View File

@@ -45,8 +45,7 @@ use crate::{
};
use eth2::types::{EventKind, SseChainReorg, SseFinalizedCheckpoint, SseHead, SseLateHead};
use fork_choice::{
CountUnrealizedFull, ExecutionStatus, ForkChoiceView, ForkchoiceUpdateParameters, ProtoBlock,
ResetPayloadStatuses,
ExecutionStatus, ForkChoiceView, ForkchoiceUpdateParameters, ProtoBlock, ResetPayloadStatuses,
};
use itertools::process_results;
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
@@ -285,19 +284,13 @@ impl<T: BeaconChainTypes> CanonicalHead<T> {
// defensive programming.
mut fork_choice_write_lock: RwLockWriteGuard<BeaconForkChoice<T>>,
reset_payload_statuses: ResetPayloadStatuses,
count_unrealized_full: CountUnrealizedFull,
store: &BeaconStore<T>,
spec: &ChainSpec,
log: &Logger,
) -> Result<(), Error> {
let fork_choice = <BeaconChain<T>>::load_fork_choice(
store.clone(),
reset_payload_statuses,
count_unrealized_full,
spec,
log,
)?
.ok_or(Error::MissingPersistedForkChoice)?;
let fork_choice =
<BeaconChain<T>>::load_fork_choice(store.clone(), reset_payload_statuses, spec, log)?
.ok_or(Error::MissingPersistedForkChoice)?;
let fork_choice_view = fork_choice.cached_fork_choice_view();
let beacon_block_root = fork_choice_view.head_block_root;
let beacon_block = store

View File

@@ -1,4 +1,4 @@
pub use proto_array::{CountUnrealizedFull, ReOrgThreshold};
pub use proto_array::ReOrgThreshold;
use serde_derive::{Deserialize, Serialize};
use std::time::Duration;
use types::{Checkpoint, Epoch};
@@ -48,16 +48,11 @@ pub struct ChainConfig {
pub builder_fallback_epochs_since_finalization: usize,
/// Whether any chain health checks should be considered when deciding whether to use the builder API.
pub builder_fallback_disable_checks: bool,
/// When set to `true`, weigh the "unrealized" FFG progression when choosing a head in fork
/// choice.
pub count_unrealized: bool,
/// When set to `true`, forget any valid/invalid/optimistic statuses in fork choice during start
/// up.
pub always_reset_payload_statuses: bool,
/// Whether to apply paranoid checks to blocks proposed by this beacon node.
pub paranoid_block_proposal: bool,
/// Whether to strictly count unrealized justified votes.
pub count_unrealized_full: CountUnrealizedFull,
/// Optionally set timeout for calls to checkpoint sync endpoint.
pub checkpoint_sync_url_timeout: u64,
/// The offset before the start of a proposal slot at which payload attributes should be sent.
@@ -67,6 +62,8 @@ pub struct ChainConfig {
pub prepare_payload_lookahead: Duration,
/// Use EL-free optimistic sync for the finalized part of the chain.
pub optimistic_finalized_sync: bool,
/// The size of the shuffling cache,
pub shuffling_cache_size: usize,
/// Whether to send payload attributes every slot, regardless of connected proposers.
///
/// This is useful for block builders and testing.
@@ -89,14 +86,13 @@ impl Default for ChainConfig {
builder_fallback_skips_per_epoch: 8,
builder_fallback_epochs_since_finalization: 3,
builder_fallback_disable_checks: false,
count_unrealized: true,
always_reset_payload_statuses: false,
paranoid_block_proposal: false,
count_unrealized_full: CountUnrealizedFull::default(),
checkpoint_sync_url_timeout: 60,
prepare_payload_lookahead: Duration::from_secs(4),
// This value isn't actually read except in tests.
optimistic_finalized_sync: true,
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
always_prepare_payload: false,
}
}

View File

@@ -1,4 +1,5 @@
use crate::attester_cache::Error as AttesterCacheError;
use crate::beacon_block_streamer::Error as BlockStreamerError;
use crate::beacon_chain::ForkChoiceError;
use crate::beacon_fork_choice_store::Error as ForkChoiceStoreError;
use crate::eth1_chain::Error as Eth1ChainError;
@@ -143,6 +144,7 @@ pub enum BeaconChainError {
ExecutionLayerMissing,
BlockVariantLacksExecutionPayload(Hash256),
ExecutionLayerErrorPayloadReconstruction(ExecutionBlockHash, Box<execution_layer::Error>),
EngineGetCapabilititesFailed(Box<execution_layer::Error>),
BlockHashMissingFromExecutionLayer(ExecutionBlockHash),
InconsistentPayloadReconstructed {
slot: Slot,
@@ -150,6 +152,7 @@ pub enum BeaconChainError {
canonical_transactions_root: Hash256,
reconstructed_transactions_root: Hash256,
},
BlockStreamerError(BlockStreamerError),
AddPayloadLogicError,
ExecutionForkChoiceUpdateFailed(execution_layer::Error),
PrepareProposerFailed(BlockProcessingError),

View File

@@ -1,7 +1,6 @@
use crate::{BeaconForkChoiceStore, BeaconSnapshot};
use fork_choice::{CountUnrealized, ForkChoice, PayloadVerificationStatus};
use itertools::process_results;
use proto_array::CountUnrealizedFull;
use slog::{info, warn, Logger};
use state_processing::state_advance::complete_state_advance;
use state_processing::{
@@ -102,7 +101,6 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
current_slot: Option<Slot>,
spec: &ChainSpec,
count_unrealized_config: CountUnrealized,
count_unrealized_full_config: CountUnrealizedFull,
) -> Result<ForkChoice<BeaconForkChoiceStore<E, Hot, Cold>, E>, String> {
// Fetch finalized block.
let finalized_checkpoint = head_state.finalized_checkpoint();
@@ -156,7 +154,6 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
&finalized_snapshot.beacon_block,
&finalized_snapshot.beacon_state,
current_slot,
count_unrealized_full_config,
spec,
)
.map_err(|e| format!("Unable to reset fork choice for revert: {:?}", e))?;

View File

@@ -2,6 +2,7 @@ pub mod attestation_rewards;
pub mod attestation_verification;
mod attester_cache;
pub mod beacon_block_reward;
mod beacon_block_streamer;
mod beacon_chain;
mod beacon_fork_choice_store;
pub mod beacon_proposer_cache;
@@ -42,7 +43,7 @@ mod persisted_fork_choice;
mod pre_finalization_cache;
pub mod proposer_prep_service;
pub mod schema_change;
mod shuffling_cache;
pub mod shuffling_cache;
mod snapshot_cache;
pub mod state_advance_timer;
pub mod sync_committee_rewards;
@@ -59,7 +60,7 @@ pub use self::beacon_chain::{
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
};
pub use self::beacon_snapshot::BeaconSnapshot;
pub use self::chain_config::{ChainConfig, CountUnrealizedFull};
pub use self::chain_config::ChainConfig;
pub use self::errors::{BeaconChainError, BlockProductionError};
pub use self::historical_blocks::HistoricalBlockError;
pub use attestation_verification::Error as AttestationError;

View File

@@ -3,6 +3,7 @@ mod migration_schema_v12;
mod migration_schema_v13;
mod migration_schema_v14;
mod migration_schema_v15;
mod migration_schema_v16;
use crate::beacon_chain::{BeaconChainTypes, ETH1_CACHE_DB_KEY};
use crate::eth1_chain::SszEth1;
@@ -132,6 +133,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v15::downgrade_from_v15::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(15), SchemaVersion(16)) => {
let ops = migration_schema_v16::upgrade_to_v16::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(16), SchemaVersion(15)) => {
let ops = migration_schema_v16::downgrade_from_v16::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,

View File

@@ -0,0 +1,46 @@
use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY};
use crate::persisted_fork_choice::PersistedForkChoiceV11;
use slog::{debug, Logger};
use std::sync::Arc;
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
pub fn upgrade_to_v16<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
drop_balances_cache::<T>(db, log)
}
pub fn downgrade_from_v16<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
drop_balances_cache::<T>(db, log)
}
/// Drop the balances cache from the fork choice store.
///
/// There aren't any type-level changes in this schema migration, however the
/// way that we compute the `JustifiedBalances` has changed due to:
/// https://github.com/sigp/lighthouse/pull/3962
pub fn drop_balances_cache<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
let mut persisted_fork_choice = db
.get_item::<PersistedForkChoiceV11>(&FORK_CHOICE_DB_KEY)?
.ok_or_else(|| Error::SchemaMigrationError("fork choice missing from database".into()))?;
debug!(
log,
"Dropping fork choice balances cache";
"item_count" => persisted_fork_choice.fork_choice_store.balances_cache.items.len()
);
// Drop all items in the balances cache.
persisted_fork_choice.fork_choice_store.balances_cache = <_>::default();
let kv_op = persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY);
Ok(vec![kv_op])
}

View File

@@ -9,7 +9,7 @@ use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256
/// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash +
/// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this
/// ignores a few extra bytes in the caches that should be insignificant compared to the indices).
const CACHE_SIZE: usize = 16;
pub const DEFAULT_CACHE_SIZE: usize = 16;
/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this
/// limits the number of concurrent states that can be loaded into memory for the committee cache.
@@ -54,9 +54,9 @@ pub struct ShufflingCache {
}
impl ShufflingCache {
pub fn new() -> Self {
pub fn new(cache_size: usize) -> Self {
Self {
cache: LruCache::new(CACHE_SIZE),
cache: LruCache::new(cache_size),
}
}
@@ -172,7 +172,7 @@ impl ToArcCommitteeCache for Arc<CommitteeCache> {
impl Default for ShufflingCache {
fn default() -> Self {
Self::new()
Self::new(DEFAULT_CACHE_SIZE)
}
}
@@ -249,7 +249,7 @@ mod test {
fn resolved_promise() {
let (committee_a, _) = committee_caches();
let id_a = shuffling_id(1);
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();
// Create a promise.
let sender = cache.create_promise(id_a.clone()).unwrap();
@@ -276,7 +276,7 @@ mod test {
#[test]
fn unresolved_promise() {
let id_a = shuffling_id(1);
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();
// Create a promise.
let sender = cache.create_promise(id_a.clone()).unwrap();
@@ -301,7 +301,7 @@ mod test {
fn two_promises() {
let (committee_a, committee_b) = committee_caches();
let (id_a, id_b) = (shuffling_id(1), shuffling_id(2));
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();
// Create promise A.
let sender_a = cache.create_promise(id_a.clone()).unwrap();
@@ -355,7 +355,7 @@ mod test {
#[test]
fn too_many_promises() {
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();
for i in 0..MAX_CONCURRENT_PROMISES {
cache.create_promise(shuffling_id(i as u64)).unwrap();

View File

@@ -500,7 +500,7 @@ async fn unaggregated_attestations_added_to_fork_choice_some_none() {
// Move forward a slot so all queued attestations can be processed.
harness.advance_slot();
fork_choice
.update_time(harness.chain.slot().unwrap(), &harness.chain.spec)
.update_time(harness.chain.slot().unwrap())
.unwrap();
let validator_slots: Vec<(usize, Slot)> = (0..VALIDATOR_COUNT)
@@ -614,7 +614,7 @@ async fn unaggregated_attestations_added_to_fork_choice_all_updated() {
// Move forward a slot so all queued attestations can be processed.
harness.advance_slot();
fork_choice
.update_time(harness.chain.slot().unwrap(), &harness.chain.spec)
.update_time(harness.chain.slot().unwrap())
.unwrap();
let validators: Vec<usize> = (0..VALIDATOR_COUNT).collect();