mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-20 05:14:35 +00:00
Block production optimisation + metrics
This commit is contained in:
@@ -50,6 +50,7 @@ use std::sync::Arc;
|
|||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator};
|
use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator};
|
||||||
use store::{Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp};
|
use store::{Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp};
|
||||||
|
use types::beacon_state::CloneConfig;
|
||||||
use types::*;
|
use types::*;
|
||||||
|
|
||||||
pub type ForkChoiceError = fork_choice::Error<crate::ForkChoiceStoreError>;
|
pub type ForkChoiceError = fork_choice::Error<crate::ForkChoiceStoreError>;
|
||||||
@@ -542,7 +543,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
/// is the state as it was when the head block was received, which could be some slots prior to
|
/// is the state as it was when the head block was received, which could be some slots prior to
|
||||||
/// now.
|
/// now.
|
||||||
pub fn head(&self) -> Result<BeaconSnapshot<T::EthSpec>, Error> {
|
pub fn head(&self) -> Result<BeaconSnapshot<T::EthSpec>, Error> {
|
||||||
self.with_head(|head| Ok(head.clone_with_only_committee_caches()))
|
self.with_head(|head| Ok(head.clone_with(CloneConfig::committee_caches_only())))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Apply a function to the canonical head without cloning it.
|
/// Apply a function to the canonical head without cloning it.
|
||||||
@@ -786,37 +787,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the block proposer for a given slot.
|
|
||||||
///
|
|
||||||
/// Information is read from the present `beacon_state` shuffling, only information from the
|
|
||||||
/// present epoch is available.
|
|
||||||
pub fn block_proposer(&self, slot: Slot) -> Result<usize, Error> {
|
|
||||||
let epoch = |slot: Slot| slot.epoch(T::EthSpec::slots_per_epoch());
|
|
||||||
let head_state = &self.head()?.beacon_state;
|
|
||||||
|
|
||||||
let mut state = if epoch(slot) == epoch(head_state.slot) {
|
|
||||||
self.head()?.beacon_state
|
|
||||||
} else {
|
|
||||||
// The block proposer shuffling is not affected by the state roots, so we don't need to
|
|
||||||
// calculate them.
|
|
||||||
self.state_at_slot(slot, StateSkipConfig::WithoutStateRoots)?
|
|
||||||
};
|
|
||||||
|
|
||||||
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
|
|
||||||
|
|
||||||
if epoch(state.slot) != epoch(slot) {
|
|
||||||
return Err(Error::InvariantViolated(format!(
|
|
||||||
"Epochs in consistent in proposer lookup: state: {}, requested: {}",
|
|
||||||
epoch(state.slot),
|
|
||||||
epoch(slot)
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
state
|
|
||||||
.get_beacon_proposer_index(slot, &self.spec)
|
|
||||||
.map_err(Into::into)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the attestation duties for a given validator index.
|
/// Returns the attestation duties for a given validator index.
|
||||||
///
|
///
|
||||||
/// Information is read from the current state, so only information from the present and prior
|
/// Information is read from the current state, so only information from the present and prior
|
||||||
@@ -1771,9 +1741,36 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
slot: Slot,
|
slot: Slot,
|
||||||
validator_graffiti: Option<Graffiti>,
|
validator_graffiti: Option<Graffiti>,
|
||||||
) -> Result<BeaconBlockAndState<T::EthSpec>, BlockProductionError> {
|
) -> Result<BeaconBlockAndState<T::EthSpec>, BlockProductionError> {
|
||||||
let state = self
|
metrics::inc_counter(&metrics::BLOCK_PRODUCTION_REQUESTS);
|
||||||
.state_at_slot(slot - 1, StateSkipConfig::WithStateRoots)
|
let _complete_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_TIMES);
|
||||||
.map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?;
|
|
||||||
|
// Producing a block requires the tree hash cache, so clone a full state corresponding to
|
||||||
|
// the head from the snapshot cache. Unfortunately we can't move the snapshot out of the
|
||||||
|
// cache (which would be fast), because we need to re-process the block after it has been
|
||||||
|
// signed.
|
||||||
|
let state_load_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_STATE_LOAD_TIMES);
|
||||||
|
let head_block_root = self
|
||||||
|
.head_info()
|
||||||
|
.map_err(BlockProductionError::UnableToGetHeadInfo)?
|
||||||
|
.block_root;
|
||||||
|
let state = if let Some(snapshot) = self
|
||||||
|
.snapshot_cache
|
||||||
|
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
|
||||||
|
.and_then(|snapshot_cache| {
|
||||||
|
snapshot_cache.get_cloned(head_block_root, CloneConfig::all())
|
||||||
|
}) {
|
||||||
|
snapshot.beacon_state
|
||||||
|
} else {
|
||||||
|
warn!(
|
||||||
|
self.log,
|
||||||
|
"Block production cache miss";
|
||||||
|
"message" => "this block is more likely to be orphaned",
|
||||||
|
"slot" => slot,
|
||||||
|
);
|
||||||
|
self.state_at_slot(slot, StateSkipConfig::WithStateRoots)
|
||||||
|
.map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?
|
||||||
|
};
|
||||||
|
drop(state_load_timer);
|
||||||
|
|
||||||
self.produce_block_on_state(state, slot, randao_reveal, validator_graffiti)
|
self.produce_block_on_state(state, slot, randao_reveal, validator_graffiti)
|
||||||
}
|
}
|
||||||
@@ -1793,14 +1790,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
randao_reveal: Signature,
|
randao_reveal: Signature,
|
||||||
validator_graffiti: Option<Graffiti>,
|
validator_graffiti: Option<Graffiti>,
|
||||||
) -> Result<BeaconBlockAndState<T::EthSpec>, BlockProductionError> {
|
) -> Result<BeaconBlockAndState<T::EthSpec>, BlockProductionError> {
|
||||||
metrics::inc_counter(&metrics::BLOCK_PRODUCTION_REQUESTS);
|
|
||||||
let timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_TIMES);
|
|
||||||
|
|
||||||
let eth1_chain = self
|
let eth1_chain = self
|
||||||
.eth1_chain
|
.eth1_chain
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.ok_or(BlockProductionError::NoEth1ChainConnection)?;
|
.ok_or(BlockProductionError::NoEth1ChainConnection)?;
|
||||||
|
|
||||||
|
let slot_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_SLOT_PROCESS_TIMES);
|
||||||
// If required, transition the new state to the present slot.
|
// If required, transition the new state to the present slot.
|
||||||
//
|
//
|
||||||
// Note: supplying some `state_root` when it it is known would be a cheap and easy
|
// Note: supplying some `state_root` when it it is known would be a cheap and easy
|
||||||
@@ -1808,6 +1803,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
while state.slot < produce_at_slot {
|
while state.slot < produce_at_slot {
|
||||||
per_slot_processing(&mut state, None, &self.spec)?;
|
per_slot_processing(&mut state, None, &self.spec)?;
|
||||||
}
|
}
|
||||||
|
drop(slot_timer);
|
||||||
|
|
||||||
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
|
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
|
||||||
|
|
||||||
@@ -1844,6 +1840,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
|
|
||||||
// Iterate through the naive aggregation pool and ensure all the attestations from there
|
// Iterate through the naive aggregation pool and ensure all the attestations from there
|
||||||
// are included in the operation pool.
|
// are included in the operation pool.
|
||||||
|
let unagg_import_timer =
|
||||||
|
metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES);
|
||||||
for attestation in self.naive_aggregation_pool.read().iter() {
|
for attestation in self.naive_aggregation_pool.read().iter() {
|
||||||
if let Err(e) = self.op_pool.insert_attestation(
|
if let Err(e) = self.op_pool.insert_attestation(
|
||||||
attestation.clone(),
|
attestation.clone(),
|
||||||
@@ -1859,6 +1857,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
drop(unagg_import_timer);
|
||||||
|
|
||||||
// Override the beacon node's graffiti with graffiti from the validator, if present.
|
// Override the beacon node's graffiti with graffiti from the validator, if present.
|
||||||
let graffiti = match validator_graffiti {
|
let graffiti = match validator_graffiti {
|
||||||
@@ -1866,6 +1865,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
None => self.graffiti,
|
None => self.graffiti,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let attestation_packing_timer =
|
||||||
|
metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES);
|
||||||
|
let attestations = self
|
||||||
|
.op_pool
|
||||||
|
.get_attestations(&state, attestation_filter, &self.spec)
|
||||||
|
.map_err(BlockProductionError::OpPoolError)?
|
||||||
|
.into();
|
||||||
|
drop(attestation_packing_timer);
|
||||||
|
|
||||||
let mut block = SignedBeaconBlock {
|
let mut block = SignedBeaconBlock {
|
||||||
message: BeaconBlock {
|
message: BeaconBlock {
|
||||||
slot: state.slot,
|
slot: state.slot,
|
||||||
@@ -1878,11 +1886,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
graffiti,
|
graffiti,
|
||||||
proposer_slashings: proposer_slashings.into(),
|
proposer_slashings: proposer_slashings.into(),
|
||||||
attester_slashings: attester_slashings.into(),
|
attester_slashings: attester_slashings.into(),
|
||||||
attestations: self
|
attestations,
|
||||||
.op_pool
|
|
||||||
.get_attestations(&state, attestation_filter, &self.spec)
|
|
||||||
.map_err(BlockProductionError::OpPoolError)?
|
|
||||||
.into(),
|
|
||||||
deposits,
|
deposits,
|
||||||
voluntary_exits: self.op_pool.get_voluntary_exits(&state, &self.spec).into(),
|
voluntary_exits: self.op_pool.get_voluntary_exits(&state, &self.spec).into(),
|
||||||
},
|
},
|
||||||
@@ -1891,6 +1895,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
signature: Signature::empty(),
|
signature: Signature::empty(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let process_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_PROCESS_TIMES);
|
||||||
per_block_processing(
|
per_block_processing(
|
||||||
&mut state,
|
&mut state,
|
||||||
&block,
|
&block,
|
||||||
@@ -1898,13 +1903,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
BlockSignatureStrategy::NoVerification,
|
BlockSignatureStrategy::NoVerification,
|
||||||
&self.spec,
|
&self.spec,
|
||||||
)?;
|
)?;
|
||||||
|
drop(process_timer);
|
||||||
|
|
||||||
|
let state_root_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_STATE_ROOT_TIMES);
|
||||||
let state_root = state.update_tree_hash_cache()?;
|
let state_root = state.update_tree_hash_cache()?;
|
||||||
|
drop(state_root_timer);
|
||||||
|
|
||||||
block.message.state_root = state_root;
|
block.message.state_root = state_root;
|
||||||
|
|
||||||
metrics::inc_counter(&metrics::BLOCK_PRODUCTION_SUCCESSES);
|
metrics::inc_counter(&metrics::BLOCK_PRODUCTION_SUCCESSES);
|
||||||
metrics::stop_timer(timer);
|
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
self.log,
|
self.log,
|
||||||
@@ -1950,7 +1957,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
let new_head = self
|
let new_head = self
|
||||||
.snapshot_cache
|
.snapshot_cache
|
||||||
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
|
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
|
||||||
.and_then(|snapshot_cache| snapshot_cache.get_cloned(beacon_block_root))
|
.and_then(|snapshot_cache| {
|
||||||
|
snapshot_cache.get_cloned(beacon_block_root, CloneConfig::committee_caches_only())
|
||||||
|
})
|
||||||
.map::<Result<_, Error>, _>(Ok)
|
.map::<Result<_, Error>, _>(Ok)
|
||||||
.unwrap_or_else(|| {
|
.unwrap_or_else(|| {
|
||||||
let beacon_block = self
|
let beacon_block = self
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use serde_derive::Serialize;
|
use serde_derive::Serialize;
|
||||||
use ssz_derive::{Decode, Encode};
|
use ssz_derive::{Decode, Encode};
|
||||||
use types::{BeaconState, EthSpec, Hash256, SignedBeaconBlock};
|
use types::{beacon_state::CloneConfig, BeaconState, EthSpec, Hash256, SignedBeaconBlock};
|
||||||
|
|
||||||
/// Represents some block and its associated state. Generally, this will be used for tracking the
|
/// Represents some block and its associated state. Generally, this will be used for tracking the
|
||||||
/// head, justified head and finalized head.
|
/// head, justified head and finalized head.
|
||||||
@@ -42,11 +42,11 @@ impl<E: EthSpec> BeaconSnapshot<E> {
|
|||||||
self.beacon_state_root = beacon_state_root;
|
self.beacon_state_root = beacon_state_root;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn clone_with_only_committee_caches(&self) -> Self {
|
pub fn clone_with(&self, clone_config: CloneConfig) -> Self {
|
||||||
Self {
|
Self {
|
||||||
beacon_block: self.beacon_block.clone(),
|
beacon_block: self.beacon_block.clone(),
|
||||||
beacon_block_root: self.beacon_block_root,
|
beacon_block_root: self.beacon_block_root,
|
||||||
beacon_state: self.beacon_state.clone_with_only_committee_caches(),
|
beacon_state: self.beacon_state.clone_with(clone_config),
|
||||||
beacon_state_root: self.beacon_state_root,
|
beacon_state_root: self.beacon_state_root,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -112,6 +112,7 @@ easy_from_to!(ForkChoiceStoreError, BeaconChainError);
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum BlockProductionError {
|
pub enum BlockProductionError {
|
||||||
|
UnableToGetHeadInfo(BeaconChainError),
|
||||||
UnableToGetBlockRootFromState,
|
UnableToGetBlockRootFromState,
|
||||||
UnableToReadSlot,
|
UnableToReadSlot,
|
||||||
UnableToProduceAtSlot(Slot),
|
UnableToProduceAtSlot(Slot),
|
||||||
|
|||||||
@@ -68,6 +68,30 @@ lazy_static! {
|
|||||||
);
|
);
|
||||||
pub static ref BLOCK_PRODUCTION_TIMES: Result<Histogram> =
|
pub static ref BLOCK_PRODUCTION_TIMES: Result<Histogram> =
|
||||||
try_create_histogram("beacon_block_production_seconds", "Full runtime of block production");
|
try_create_histogram("beacon_block_production_seconds", "Full runtime of block production");
|
||||||
|
pub static ref BLOCK_PRODUCTION_STATE_LOAD_TIMES: Result<Histogram> = try_create_histogram(
|
||||||
|
"beacon_block_production_state_load_seconds",
|
||||||
|
"Time taken to load the base state for block production"
|
||||||
|
);
|
||||||
|
pub static ref BLOCK_PRODUCTION_SLOT_PROCESS_TIMES: Result<Histogram> = try_create_histogram(
|
||||||
|
"beacon_block_production_slot_process_seconds",
|
||||||
|
"Time taken to advance the state to the block production slot"
|
||||||
|
);
|
||||||
|
pub static ref BLOCK_PRODUCTION_UNAGGREGATED_TIMES: Result<Histogram> = try_create_histogram(
|
||||||
|
"beacon_block_production_unaggregated_seconds",
|
||||||
|
"Time taken to import the naive aggregation pool for block production"
|
||||||
|
);
|
||||||
|
pub static ref BLOCK_PRODUCTION_ATTESTATION_TIMES: Result<Histogram> = try_create_histogram(
|
||||||
|
"beacon_block_production_attestation_seconds",
|
||||||
|
"Time taken to pack attestations into a block"
|
||||||
|
);
|
||||||
|
pub static ref BLOCK_PRODUCTION_PROCESS_TIMES: Result<Histogram> = try_create_histogram(
|
||||||
|
"beacon_block_production_process_seconds",
|
||||||
|
"Time taken to process the block produced"
|
||||||
|
);
|
||||||
|
pub static ref BLOCK_PRODUCTION_STATE_ROOT_TIMES: Result<Histogram> = try_create_histogram(
|
||||||
|
"beacon_block_production_state_root_seconds",
|
||||||
|
"Time taken to calculate the block's state root"
|
||||||
|
);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Block Statistics
|
* Block Statistics
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use crate::BeaconSnapshot;
|
use crate::BeaconSnapshot;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use types::{Epoch, EthSpec, Hash256};
|
use types::{beacon_state::CloneConfig, Epoch, EthSpec, Hash256};
|
||||||
|
|
||||||
/// The default size of the cache.
|
/// The default size of the cache.
|
||||||
pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4;
|
pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4;
|
||||||
@@ -69,13 +69,16 @@ impl<T: EthSpec> SnapshotCache<T> {
|
|||||||
.map(|i| self.snapshots.remove(i))
|
.map(|i| self.snapshots.remove(i))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If there is a snapshot with `block_root`, clone it (with only the committee caches) and
|
/// If there is a snapshot with `block_root`, clone it and return the clone.
|
||||||
/// return the clone.
|
pub fn get_cloned(
|
||||||
pub fn get_cloned(&self, block_root: Hash256) -> Option<BeaconSnapshot<T>> {
|
&self,
|
||||||
|
block_root: Hash256,
|
||||||
|
clone_config: CloneConfig,
|
||||||
|
) -> Option<BeaconSnapshot<T>> {
|
||||||
self.snapshots
|
self.snapshots
|
||||||
.iter()
|
.iter()
|
||||||
.find(|snapshot| snapshot.beacon_block_root == block_root)
|
.find(|snapshot| snapshot.beacon_block_root == block_root)
|
||||||
.map(|snapshot| snapshot.clone_with_only_committee_caches())
|
.map(|snapshot| snapshot.clone_with(clone_config))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes all snapshots from the queue that are less than or equal to the finalized epoch.
|
/// Removes all snapshots from the queue that are less than or equal to the finalized epoch.
|
||||||
|
|||||||
Reference in New Issue
Block a user