Merge branch 'master' into master-sf

This commit is contained in:
pawanjay176
2020-04-28 18:19:33 +05:30
164 changed files with 5444 additions and 2104 deletions

View File

@@ -8,6 +8,7 @@ use crate::events::{EventHandler, EventKind};
use crate::fork_choice::{Error as ForkChoiceError, ForkChoice};
use crate::head_tracker::HeadTracker;
use crate::metrics;
use crate::migrate::Migrate;
use crate::naive_aggregation_pool::{Error as NaiveAggregationError, NaiveAggregationPool};
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::shuffling_cache::ShufflingCache;
@@ -28,19 +29,23 @@ use state_processing::{
};
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::HashSet;
use std::io::prelude::*;
use std::sync::Arc;
use std::time::{Duration, Instant};
use store::iter::{
BlockRootsIterator, ReverseBlockRootIterator, ReverseStateRootIterator, StateRootsIterator,
BlockRootsIterator, ParentRootBlockIterator, ReverseBlockRootIterator,
ReverseStateRootIterator, StateRootsIterator,
};
use store::{Error as DBError, Migrate, Store};
use store::{Error as DBError, Store};
use types::*;
// Text included in blocks.
// Must be 32-bytes or panic.
//
// |-------must be this long------|
pub const GRAFFITI: &str = "sigp/lighthouse-0.1.1-prerelease";
pub const GRAFFITI: &str = "sigp/lighthouse-0.2.0-prerelease";
/// The time-out before failure during an operation to take a read/write RwLock on the canonical
/// head.
@@ -159,7 +164,7 @@ pub struct HeadInfo {
pub trait BeaconChainTypes: Send + Sync + 'static {
type Store: store::Store<Self::EthSpec>;
type StoreMigrator: store::Migrate<Self::Store, Self::EthSpec>;
type StoreMigrator: Migrate<Self::Store, Self::EthSpec>;
type SlotClock: slot_clock::SlotClock;
type Eth1Chain: Eth1ChainBackend<Self::EthSpec, Self::Store>;
type EthSpec: types::EthSpec;
@@ -199,7 +204,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// A handler for events generated by the beacon chain.
pub event_handler: T::EventHandler,
/// Used to track the heads of the beacon chain.
pub(crate) head_tracker: HeadTracker,
pub(crate) head_tracker: Arc<HeadTracker>,
/// A cache dedicated to block processing.
pub(crate) snapshot_cache: TimeoutRwLock<SnapshotCache<T::EthSpec>>,
/// Caches the shuffling for a given epoch and state root.
@@ -497,6 +502,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.head_tracker.heads()
}
pub fn knows_head(&self, block_hash: &SignedBeaconBlockHash) -> bool {
self.head_tracker.contains_head((*block_hash).into())
}
/// Returns the `BeaconState` at the given slot.
///
/// Returns `None` when the state is not found in the database or there is an error skipping
@@ -1115,11 +1124,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})?;
let signature_set = indexed_attestation_signature_set_from_pubkeys(
|validator_index| {
pubkey_cache
.get(validator_index)
.map(|pk| Cow::Borrowed(pk.as_point()))
},
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
&attestation.signature,
&indexed_attestation,
&fork,
@@ -1230,6 +1235,76 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
/// Check that the shuffling at `block_root` is equal to one of the shufflings of `state`.
///
/// The `target_epoch` argument determines which shuffling to check compatibility with, it
/// should be equal to the current or previous epoch of `state`, or else `false` will be
/// returned.
///
/// The compatibility check is designed to be fast: we check that the block that
/// determined the RANDAO mix for the `target_epoch` matches the ancestor of the block
/// identified by `block_root` (at that slot).
pub fn shuffling_is_compatible(
&self,
block_root: &Hash256,
target_epoch: Epoch,
state: &BeaconState<T::EthSpec>,
) -> bool {
let slots_per_epoch = T::EthSpec::slots_per_epoch();
let shuffling_lookahead = 1 + self.spec.min_seed_lookahead.as_u64();
// Shuffling can't have changed if we're in the first few epochs
if state.current_epoch() < shuffling_lookahead {
return true;
}
// Otherwise the shuffling is determined by the block at the end of the target epoch
// minus the shuffling lookahead (usually 2). We call this the "pivot".
let pivot_slot =
if target_epoch == state.previous_epoch() || target_epoch == state.current_epoch() {
(target_epoch - shuffling_lookahead).end_slot(slots_per_epoch)
} else {
return false;
};
let state_pivot_block_root = match state.get_block_root(pivot_slot) {
Ok(root) => *root,
Err(e) => {
warn!(
&self.log,
"Missing pivot block root for attestation";
"slot" => pivot_slot,
"error" => format!("{:?}", e),
);
return false;
}
};
// Use fork choice's view of the block DAG to quickly evaluate whether the attestation's
// pivot block is the same as the current state's pivot block. If it is, then the
// attestation's shuffling is the same as the current state's.
// To account for skipped slots, find the first block at *or before* the pivot slot.
let fork_choice_lock = self.fork_choice.core_proto_array();
let pivot_block_root = fork_choice_lock
.iter_block_roots(block_root)
.find(|(_, slot)| *slot <= pivot_slot)
.map(|(block_root, _)| block_root);
drop(fork_choice_lock);
match pivot_block_root {
Some(root) => root == state_pivot_block_root,
None => {
debug!(
&self.log,
"Discarding attestation because of missing ancestor";
"pivot_slot" => pivot_slot.as_u64(),
"block_root" => format!("{:?}", block_root),
);
false
}
}
}
/// Accept some exit and queue it for inclusion in an appropriate block.
pub fn process_voluntary_exit(
&self,
@@ -1365,6 +1440,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Err(BlockError::BlockIsAlreadyKnown) => continue,
// If the block is the genesis block, simply ignore this block.
Err(BlockError::GenesisBlock) => continue,
// If the block is is for a finalized slot, simply ignore this block.
//
// The block is either:
//
// 1. In the canonical finalized chain.
// 2. In some non-canonical chain at a slot that has been finalized already.
//
// In the case of (1), there's no need to re-import and later blocks in this
// segement might be useful.
//
// In the case of (2), skipping the block is valid since we should never import it.
// However, we will potentially get a `ParentUnknown` on a later block. The sync
// protocol will need to ensure this is handled gracefully.
Err(BlockError::WouldRevertFinalizedSlot { .. }) => continue,
// If there was an error whilst determining if the block was invalid, return that
// error.
Err(BlockError::BeaconChainError(e)) => {
@@ -1445,7 +1534,34 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
block: SignedBeaconBlock<T::EthSpec>,
) -> Result<GossipVerifiedBlock<T>, BlockError> {
GossipVerifiedBlock::new(block, self)
let slot = block.message.slot;
let graffiti_string = String::from_utf8(block.message.body.graffiti[..].to_vec())
.unwrap_or_else(|_| format!("{:?}", &block.message.body.graffiti[..]));
match GossipVerifiedBlock::new(block, self) {
Ok(verified) => {
debug!(
self.log,
"Successfully processed gossip block";
"graffiti" => graffiti_string,
"slot" => slot,
"root" => format!("{:?}", verified.block_root()),
);
Ok(verified)
}
Err(e) => {
debug!(
self.log,
"Rejected gossip block";
"error" => format!("{:?}", e),
"graffiti" => graffiti_string,
"slot" => slot,
);
Err(e)
}
}
}
/// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and
@@ -1722,6 +1838,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.deposits_for_block_inclusion(&state, &eth1_data, &self.spec)?
.into();
// Map from attestation head block root to shuffling compatibility.
// Used to memoize the `attestation_shuffling_is_compatible` function.
let mut shuffling_filter_cache = HashMap::new();
let attestation_filter = |att: &&Attestation<T::EthSpec>| -> bool {
*shuffling_filter_cache
.entry((att.data.beacon_block_root, att.data.target.epoch))
.or_insert_with(|| {
self.shuffling_is_compatible(
&att.data.beacon_block_root,
att.data.target.epoch,
&state,
)
})
};
let mut block = SignedBeaconBlock {
message: BeaconBlock {
slot: state.slot,
@@ -1736,7 +1867,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
attester_slashings: attester_slashings.into(),
attestations: self
.op_pool
.get_attestations(&state, &self.spec)
.get_attestations(&state, attestation_filter, &self.spec)
.map_err(BlockProductionError::OpPoolError)?
.into(),
deposits,
@@ -1794,6 +1925,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let beacon_block_root = self.fork_choice.find_head(&self)?;
let current_head = self.head_info()?;
let old_finalized_root = current_head.finalized_checkpoint.root;
if beacon_block_root == current_head.block_root {
return Ok(());
@@ -1921,7 +2053,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
});
if new_finalized_epoch != old_finalized_epoch {
self.after_finalization(old_finalized_epoch, finalized_root)?;
self.after_finalization(
old_finalized_epoch,
finalized_root,
old_finalized_root.into(),
)?;
}
let _ = self.event_handler.register(EventKind::BeaconHeadChanged {
@@ -1950,6 +2086,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
old_finalized_epoch: Epoch,
finalized_block_root: Hash256,
old_finalized_root: SignedBeaconBlockHash,
) -> Result<(), Error> {
let finalized_block = self
.store
@@ -1989,10 +2126,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// TODO: configurable max finality distance
let max_finality_distance = 0;
self.store_migrator.freeze_to_state(
self.store_migrator.process_finalization(
finalized_block.state_root,
finalized_state,
max_finality_distance,
Arc::clone(&self.head_tracker),
old_finalized_root,
finalized_block_root.into(),
);
let _ = self.event_handler.register(EventKind::BeaconFinalization {
@@ -2076,6 +2216,100 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.slot_clock
.duration_to_slot(epoch.start_slot(T::EthSpec::slots_per_epoch()))
}
pub fn dump_as_dot<W: Write>(&self, output: &mut W) {
let canonical_head_hash = self
.canonical_head
.try_read_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| Error::CanonicalHeadLockTimeout)
.unwrap()
.beacon_block_root;
let mut visited: HashSet<Hash256> = HashSet::new();
let mut finalized_blocks: HashSet<Hash256> = HashSet::new();
let genesis_block_hash = Hash256::zero();
write!(output, "digraph beacon {{\n").unwrap();
write!(output, "\t_{:?}[label=\"genesis\"];\n", genesis_block_hash).unwrap();
// Canonical head needs to be processed first as otherwise finalized blocks aren't detected
// properly.
let heads = {
let mut heads = self.heads();
let canonical_head_index = heads
.iter()
.position(|(block_hash, _)| *block_hash == canonical_head_hash)
.unwrap();
let (canonical_head_hash, canonical_head_slot) =
heads.swap_remove(canonical_head_index);
heads.insert(0, (canonical_head_hash, canonical_head_slot));
heads
};
for (head_hash, _head_slot) in heads {
for (block_hash, signed_beacon_block) in
ParentRootBlockIterator::new(&*self.store, head_hash)
{
if visited.contains(&block_hash) {
break;
}
visited.insert(block_hash);
if signed_beacon_block.slot() % T::EthSpec::slots_per_epoch() == 0 {
let block = self.get_block(&block_hash).unwrap().unwrap();
let state = self
.get_state(&block.state_root(), Some(block.slot()))
.unwrap()
.unwrap();
finalized_blocks.insert(state.finalized_checkpoint.root);
}
if block_hash == canonical_head_hash {
write!(
output,
"\t_{:?}[label=\"{} ({})\" shape=box3d];\n",
block_hash,
block_hash,
signed_beacon_block.slot()
)
.unwrap();
} else if finalized_blocks.contains(&block_hash) {
write!(
output,
"\t_{:?}[label=\"{} ({})\" shape=Msquare];\n",
block_hash,
block_hash,
signed_beacon_block.slot()
)
.unwrap();
} else {
write!(
output,
"\t_{:?}[label=\"{} ({})\" shape=box];\n",
block_hash,
block_hash,
signed_beacon_block.slot()
)
.unwrap();
}
write!(
output,
"\t_{:?} -> _{:?};\n",
block_hash,
signed_beacon_block.parent_root()
)
.unwrap();
}
}
write!(output, "}}\n").unwrap();
}
// Used for debugging
#[allow(dead_code)]
pub fn dump_dot_file(&self, file_name: &str) {
let mut file = std::fs::File::create(file_name).unwrap();
self.dump_as_dot(&mut file);
}
}
impl<T: BeaconChainTypes> Drop for BeaconChain<T> {

View File

@@ -49,19 +49,22 @@ use crate::{
metrics, BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot,
};
use parking_lot::RwLockReadGuard;
use slog::{error, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use state_processing::{
block_signature_verifier::{
BlockSignatureVerifier, Error as BlockSignatureVerifierError, G1Point,
},
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
per_block_processing, per_slot_processing, BlockProcessingError, BlockSignatureStrategy,
SlotProcessingError,
};
use std::borrow::Cow;
use std::fs;
use std::io::Write;
use store::{Error as DBError, StateBatch};
use tree_hash::TreeHash;
use types::{
BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256,
RelativeEpoch, SignedBeaconBlock, Slot,
PublicKey, RelativeEpoch, SignedBeaconBlock, Slot,
};
mod block_processing_outcome;
@@ -71,6 +74,12 @@ pub use block_processing_outcome::BlockProcessingOutcome;
/// Maximum block slot number. Block with slots bigger than this constant will NOT be processed.
const MAXIMUM_BLOCK_SLOT_NUMBER: u64 = 4_294_967_296; // 2^32
/// If true, everytime a block is processed the pre-state, post-state and block are written to SSZ
/// files in the temp directory.
///
/// Only useful for testing.
const WRITE_BLOCK_PROCESSING_SSZ: bool = cfg!(feature = "write_ssz_files");
/// Returned when a block was not verified. A block is not verified for two reasons:
///
/// - The block is malformed/invalid (indicated by all results other than `BeaconChainError`.
@@ -304,6 +313,10 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
Err(BlockError::ProposalSignatureInvalid)
}
}
pub fn block_root(&self) -> Hash256 {
self.block_root
}
}
impl<T: BeaconChainTypes> IntoFullyVerifiedBlock<T> for GossipVerifiedBlock<T> {
@@ -517,6 +530,13 @@ impl<T: BeaconChainTypes> FullyVerifiedBlock<T> {
* invalid.
*/
write_state(
&format!("state_pre_block_{}", block_root),
&state,
&chain.log,
);
write_block(&block, block_root, &chain.log);
let core_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CORE);
if let Err(err) = per_block_processing(
@@ -547,6 +567,12 @@ impl<T: BeaconChainTypes> FullyVerifiedBlock<T> {
metrics::stop_timer(state_root_timer);
write_state(
&format!("state_post_block_{}", block_root),
&state,
&chain.log,
);
/*
* Check to ensure the state root on the block matches the one we have calculated.
*/
@@ -785,7 +811,7 @@ fn get_signature_verifier<'a, E: EthSpec>(
state: &'a BeaconState<E>,
validator_pubkey_cache: &'a ValidatorPubkeyCache,
spec: &'a ChainSpec,
) -> BlockSignatureVerifier<'a, E, impl Fn(usize) -> Option<Cow<'a, G1Point>> + Clone> {
) -> BlockSignatureVerifier<'a, E, impl Fn(usize) -> Option<Cow<'a, PublicKey>> + Clone> {
BlockSignatureVerifier::new(
state,
move |validator_index| {
@@ -794,7 +820,7 @@ fn get_signature_verifier<'a, E: EthSpec>(
if validator_index < state.validators.len() {
validator_pubkey_cache
.get(validator_index)
.map(|pk| Cow::Borrowed(pk.as_point()))
.map(|pk| Cow::Borrowed(pk))
} else {
None
}
@@ -802,3 +828,46 @@ fn get_signature_verifier<'a, E: EthSpec>(
spec,
)
}
fn write_state<T: EthSpec>(prefix: &str, state: &BeaconState<T>, log: &Logger) {
if WRITE_BLOCK_PROCESSING_SSZ {
let root = state.tree_hash_root();
let filename = format!("{}_slot_{}_root_{}.ssz", prefix, state.slot, root);
let mut path = std::env::temp_dir().join("lighthouse");
let _ = fs::create_dir_all(path.clone());
path = path.join(filename);
match fs::File::create(path.clone()) {
Ok(mut file) => {
let _ = file.write_all(&state.as_ssz_bytes());
}
Err(e) => error!(
log,
"Failed to log state";
"path" => format!("{:?}", path),
"error" => format!("{:?}", e)
),
}
}
}
fn write_block<T: EthSpec>(block: &SignedBeaconBlock<T>, root: Hash256, log: &Logger) {
if WRITE_BLOCK_PROCESSING_SSZ {
let filename = format!("block_slot_{}_root{}.ssz", block.message.slot, root);
let mut path = std::env::temp_dir().join("lighthouse");
let _ = fs::create_dir_all(path.clone());
path = path.join(filename);
match fs::File::create(path.clone()) {
Ok(mut file) => {
let _ = file.write_all(&block.as_ssz_bytes());
}
Err(e) => error!(
log,
"Failed to log block";
"path" => format!("{:?}", path),
"error" => format!("{:?}", e)
),
}
}
}

View File

@@ -4,7 +4,7 @@ use types::{Hash256, Slot};
/// This is a legacy object that is being kept around to reduce merge conflicts.
///
/// As soon as this is merged into master, it should be removed as soon as possible.
/// TODO: As soon as this is merged into master, it should be removed as soon as possible.
#[derive(Debug, PartialEq)]
pub enum BlockProcessingOutcome {
/// Block was valid and imported into the block graph.

View File

@@ -5,6 +5,7 @@ use crate::eth1_chain::{CachingEth1Backend, SszEth1};
use crate::events::NullEventHandler;
use crate::fork_choice::SszForkChoice;
use crate::head_tracker::HeadTracker;
use crate::migrate::Migrate;
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::shuffling_cache::ShufflingCache;
use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE};
@@ -47,7 +48,7 @@ impl<TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, TEventHandler>
for Witness<TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, TEventHandler>
where
TStore: Store<TEthSpec> + 'static,
TStoreMigrator: store::Migrate<TStore, TEthSpec> + 'static,
TStoreMigrator: Migrate<TStore, TEthSpec> + 'static,
TSlotClock: SlotClock + 'static,
TEth1Backend: Eth1ChainBackend<TEthSpec, TStore> + 'static,
TEthSpec: EthSpec + 'static,
@@ -97,7 +98,7 @@ impl<TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, TEventHandler>
>
where
TStore: Store<TEthSpec> + 'static,
TStoreMigrator: store::Migrate<TStore, TEthSpec> + 'static,
TStoreMigrator: Migrate<TStore, TEthSpec> + 'static,
TSlotClock: SlotClock + 'static,
TEth1Backend: Eth1ChainBackend<TEthSpec, TStore> + 'static,
TEthSpec: EthSpec + 'static,
@@ -229,7 +230,7 @@ where
.get::<PersistedBeaconChain>(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY))
.map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))?
.ok_or_else(|| {
"No persisted beacon chain found in store. Try deleting the .lighthouse/beacon dir."
"No persisted beacon chain found in store. Try purging the beacon chain database."
.to_string()
})?;
@@ -442,7 +443,7 @@ where
event_handler: self
.event_handler
.ok_or_else(|| "Cannot build without an event handler".to_string())?,
head_tracker: self.head_tracker.unwrap_or_default(),
head_tracker: Arc::new(self.head_tracker.unwrap_or_default()),
snapshot_cache: TimeoutRwLock::new(SnapshotCache::new(
DEFAULT_SNAPSHOT_CACHE_SIZE,
canonical_head,
@@ -475,7 +476,7 @@ impl<TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, TEventHandler>
>
where
TStore: Store<TEthSpec> + 'static,
TStoreMigrator: store::Migrate<TStore, TEthSpec> + 'static,
TStoreMigrator: Migrate<TStore, TEthSpec> + 'static,
TSlotClock: SlotClock + 'static,
TEth1Backend: Eth1ChainBackend<TEthSpec, TStore> + 'static,
TEthSpec: EthSpec + 'static,
@@ -545,7 +546,7 @@ impl<TStore, TStoreMigrator, TSlotClock, TEthSpec, TEventHandler>
>
where
TStore: Store<TEthSpec> + 'static,
TStoreMigrator: store::Migrate<TStore, TEthSpec> + 'static,
TStoreMigrator: Migrate<TStore, TEthSpec> + 'static,
TSlotClock: SlotClock + 'static,
TEthSpec: EthSpec + 'static,
TEventHandler: EventHandler<TEthSpec> + 'static,
@@ -583,7 +584,7 @@ impl<TStore, TStoreMigrator, TEth1Backend, TEthSpec, TEventHandler>
>
where
TStore: Store<TEthSpec> + 'static,
TStoreMigrator: store::Migrate<TStore, TEthSpec> + 'static,
TStoreMigrator: Migrate<TStore, TEthSpec> + 'static,
TEth1Backend: Eth1ChainBackend<TEthSpec, TStore> + 'static,
TEthSpec: EthSpec + 'static,
TEventHandler: EventHandler<TEthSpec> + 'static,
@@ -622,7 +623,7 @@ impl<TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec>
>
where
TStore: Store<TEthSpec> + 'static,
TStoreMigrator: store::Migrate<TStore, TEthSpec> + 'static,
TStoreMigrator: Migrate<TStore, TEthSpec> + 'static,
TSlotClock: SlotClock + 'static,
TEth1Backend: Eth1ChainBackend<TEthSpec, TStore> + 'static,
TEthSpec: EthSpec + 'static,
@@ -654,12 +655,12 @@ fn genesis_block<T: EthSpec>(
#[cfg(test)]
mod test {
use super::*;
use crate::migrate::{MemoryStore, NullMigrator};
use eth2_hashing::hash;
use genesis::{generate_deterministic_keypairs, interop_genesis_state};
use sloggers::{null::NullLoggerBuilder, Build};
use ssz::Encode;
use std::time::Duration;
use store::{migrate::NullMigrator, MemoryStore};
use tempfile::tempdir;
use types::{EthSpec, MinimalEthSpec, Slot};

View File

@@ -43,6 +43,14 @@ pub enum Error {
///
/// The eth1 caches are stale, or a junk value was voted into the chain.
UnknownPreviousEth1BlockHash,
/// An arithmetic error occurred.
ArithError(safe_arith::ArithError),
}
impl From<safe_arith::ArithError> for Error {
fn from(e: safe_arith::ArithError) -> Self {
Self::ArithError(e)
}
}
#[derive(Encode, Decode, Clone)]
@@ -367,7 +375,7 @@ impl<T: EthSpec, S: Store<T>> Eth1ChainBackend<T, S> for CachingEth1Backend<T, S
_spec: &ChainSpec,
) -> Result<Vec<Deposit>, Error> {
let deposit_index = state.eth1_deposit_index;
let deposit_count = if let Some(new_eth1_data) = get_new_eth1_data(state, eth1_data_vote) {
let deposit_count = if let Some(new_eth1_data) = get_new_eth1_data(state, eth1_data_vote)? {
new_eth1_data.deposit_count
} else {
state.eth1_data.deposit_count

View File

@@ -25,11 +25,22 @@ impl HeadTracker {
/// the upstream user.
pub fn register_block<E: EthSpec>(&self, block_root: Hash256, block: &BeaconBlock<E>) {
let mut map = self.0.write();
map.remove(&block.parent_root);
map.insert(block_root, block.slot);
}
/// Removes abandoned head.
pub fn remove_head(&self, block_root: Hash256) {
let mut map = self.0.write();
debug_assert!(map.contains_key(&block_root));
map.remove(&block_root);
}
/// Returns true iff `block_root` is a recognized head.
pub fn contains_head(&self, block_root: Hash256) -> bool {
self.0.read().contains_key(&block_root)
}
/// Returns the list of heads in the chain.
pub fn heads(&self) -> Vec<(Hash256, Slot)> {
self.0

View File

@@ -12,6 +12,7 @@ pub mod events;
mod fork_choice;
mod head_tracker;
mod metrics;
pub mod migrate;
mod naive_aggregation_pool;
mod persisted_beacon_chain;
mod shuffling_cache;

View File

@@ -0,0 +1,349 @@
use crate::errors::BeaconChainError;
use crate::head_tracker::HeadTracker;
use parking_lot::Mutex;
use slog::{debug, warn, Logger};
use std::collections::{HashMap, HashSet};
use std::iter::FromIterator;
use std::mem;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use store::iter::{ParentRootBlockIterator, RootsIterator};
use store::{hot_cold_store::HotColdDBError, Error, SimpleDiskStore, Store};
pub use store::{DiskStore, MemoryStore};
use types::*;
use types::{BeaconState, EthSpec, Hash256, Slot};
/// Trait for migration processes that update the database upon finalization.
pub trait Migrate<S: Store<E>, E: EthSpec>: Send + Sync + 'static {
fn new(db: Arc<S>, log: Logger) -> Self;
fn process_finalization(
&self,
_state_root: Hash256,
_new_finalized_state: BeaconState<E>,
_max_finality_distance: u64,
_head_tracker: Arc<HeadTracker>,
_old_finalized_block_hash: SignedBeaconBlockHash,
_new_finalized_block_hash: SignedBeaconBlockHash,
) {
}
/// Traverses live heads and prunes blocks and states of chains that we know can't be built
/// upon because finalization would prohibit it. This is a optimisation intended to save disk
/// space.
///
/// Assumptions:
/// * It is called after every finalization.
fn prune_abandoned_forks(
store: Arc<S>,
head_tracker: Arc<HeadTracker>,
old_finalized_block_hash: SignedBeaconBlockHash,
new_finalized_block_hash: SignedBeaconBlockHash,
new_finalized_slot: Slot,
) -> Result<(), BeaconChainError> {
let old_finalized_slot = store
.get_block(&old_finalized_block_hash.into())?
.ok_or_else(|| BeaconChainError::MissingBeaconBlock(old_finalized_block_hash.into()))?
.slot();
// Collect hashes from new_finalized_block back to old_finalized_block (inclusive)
let mut found_block = false; // hack for `take_until`
let newly_finalized_blocks: HashMap<SignedBeaconBlockHash, Slot> = HashMap::from_iter(
ParentRootBlockIterator::new(&*store, new_finalized_block_hash.into())
.take_while(|(block_hash, _)| {
if found_block {
false
} else {
found_block |= *block_hash == old_finalized_block_hash.into();
true
}
})
.map(|(block_hash, block)| (block_hash.into(), block.slot())),
);
// We don't know which blocks are shared among abandoned chains, so we buffer and delete
// everything in one fell swoop.
let mut abandoned_blocks: HashSet<SignedBeaconBlockHash> = HashSet::new();
let mut abandoned_states: HashSet<(Slot, BeaconStateHash)> = HashSet::new();
let mut abandoned_heads: HashSet<Hash256> = HashSet::new();
for (head_hash, head_slot) in head_tracker.heads() {
let mut potentially_abandoned_head: Option<Hash256> = Some(head_hash);
let mut potentially_abandoned_blocks: Vec<(
Slot,
Option<SignedBeaconBlockHash>,
Option<BeaconStateHash>,
)> = Vec::new();
let head_state_hash = store
.get_block(&head_hash)?
.ok_or_else(|| BeaconStateError::MissingBeaconBlock(head_hash.into()))?
.state_root();
let iterator = std::iter::once((head_hash, head_state_hash, head_slot))
.chain(RootsIterator::from_block(Arc::clone(&store), head_hash)?);
for (block_hash, state_hash, slot) in iterator {
if slot < old_finalized_slot {
// We must assume here any candidate chains include old_finalized_block_hash,
// i.e. there aren't any forks starting at a block that is a strict ancestor of
// old_finalized_block_hash.
break;
}
match newly_finalized_blocks.get(&block_hash.into()).copied() {
// Block is not finalized, mark it and its state for deletion
None => {
potentially_abandoned_blocks.push((
slot,
Some(block_hash.into()),
Some(state_hash.into()),
));
}
Some(finalized_slot) => {
// Block root is finalized, and we have reached the slot it was finalized
// at: we've hit a shared part of the chain.
if finalized_slot == slot {
// The first finalized block of a candidate chain lies after (in terms
// of slots order) the newly finalized block. It's not a candidate for
// prunning.
if finalized_slot == new_finalized_slot {
potentially_abandoned_blocks.clear();
potentially_abandoned_head.take();
}
break;
}
// Block root is finalized, but we're at a skip slot: delete the state only.
else {
potentially_abandoned_blocks.push((
slot,
None,
Some(state_hash.into()),
));
}
}
}
}
abandoned_heads.extend(potentially_abandoned_head.into_iter());
if !potentially_abandoned_blocks.is_empty() {
abandoned_blocks.extend(
potentially_abandoned_blocks
.iter()
.filter_map(|(_, maybe_block_hash, _)| *maybe_block_hash),
);
abandoned_states.extend(potentially_abandoned_blocks.iter().filter_map(
|(slot, _, maybe_state_hash)| match maybe_state_hash {
None => None,
Some(state_hash) => Some((*slot, *state_hash)),
},
));
}
}
// XXX Should be performed atomically, see
// https://github.com/sigp/lighthouse/issues/692
for block_hash in abandoned_blocks.into_iter() {
store.delete_block(&block_hash.into())?;
}
for (slot, state_hash) in abandoned_states.into_iter() {
store.delete_state(&state_hash.into(), slot)?;
}
for head_hash in abandoned_heads.into_iter() {
head_tracker.remove_head(head_hash);
}
Ok(())
}
}
/// Migrator that does nothing, for stores that don't need migration.
pub struct NullMigrator;
impl<E: EthSpec> Migrate<SimpleDiskStore<E>, E> for NullMigrator {
fn new(_: Arc<SimpleDiskStore<E>>, _: Logger) -> Self {
NullMigrator
}
}
impl<E: EthSpec> Migrate<MemoryStore<E>, E> for NullMigrator {
fn new(_: Arc<MemoryStore<E>>, _: Logger) -> Self {
NullMigrator
}
}
/// Migrator that immediately calls the store's migration function, blocking the current execution.
///
/// Mostly useful for tests.
pub struct BlockingMigrator<S> {
db: Arc<S>,
}
impl<E: EthSpec, S: Store<E>> Migrate<S, E> for BlockingMigrator<S> {
fn new(db: Arc<S>, _: Logger) -> Self {
BlockingMigrator { db }
}
fn process_finalization(
&self,
state_root: Hash256,
new_finalized_state: BeaconState<E>,
_max_finality_distance: u64,
head_tracker: Arc<HeadTracker>,
old_finalized_block_hash: SignedBeaconBlockHash,
new_finalized_block_hash: SignedBeaconBlockHash,
) {
if let Err(e) = S::process_finalization(self.db.clone(), state_root, &new_finalized_state) {
// This migrator is only used for testing, so we just log to stderr without a logger.
eprintln!("Migration error: {:?}", e);
}
if let Err(e) = Self::prune_abandoned_forks(
self.db.clone(),
head_tracker,
old_finalized_block_hash,
new_finalized_block_hash,
new_finalized_state.slot,
) {
eprintln!("Pruning error: {:?}", e);
}
}
}
type MpscSender<E> = mpsc::Sender<(
Hash256,
BeaconState<E>,
Arc<HeadTracker>,
SignedBeaconBlockHash,
SignedBeaconBlockHash,
Slot,
)>;
/// Migrator that runs a background thread to migrate state from the hot to the cold database.
pub struct BackgroundMigrator<E: EthSpec> {
db: Arc<DiskStore<E>>,
tx_thread: Mutex<(MpscSender<E>, thread::JoinHandle<()>)>,
log: Logger,
}
impl<E: EthSpec> Migrate<DiskStore<E>, E> for BackgroundMigrator<E> {
fn new(db: Arc<DiskStore<E>>, log: Logger) -> Self {
let tx_thread = Mutex::new(Self::spawn_thread(db.clone(), log.clone()));
Self { db, tx_thread, log }
}
/// Perform the freezing operation on the database,
fn process_finalization(
&self,
finalized_state_root: Hash256,
new_finalized_state: BeaconState<E>,
max_finality_distance: u64,
head_tracker: Arc<HeadTracker>,
old_finalized_block_hash: SignedBeaconBlockHash,
new_finalized_block_hash: SignedBeaconBlockHash,
) {
if !self.needs_migration(new_finalized_state.slot, max_finality_distance) {
return;
}
let (ref mut tx, ref mut thread) = *self.tx_thread.lock();
let new_finalized_slot = new_finalized_state.slot;
if let Err(tx_err) = tx.send((
finalized_state_root,
new_finalized_state,
head_tracker,
old_finalized_block_hash,
new_finalized_block_hash,
new_finalized_slot,
)) {
let (new_tx, new_thread) = Self::spawn_thread(self.db.clone(), self.log.clone());
drop(mem::replace(tx, new_tx));
let old_thread = mem::replace(thread, new_thread);
// Join the old thread, which will probably have panicked, or may have
// halted normally just now as a result of us dropping the old `mpsc::Sender`.
if let Err(thread_err) = old_thread.join() {
warn!(
self.log,
"Migration thread died, so it was restarted";
"reason" => format!("{:?}", thread_err)
);
}
// Retry at most once, we could recurse but that would risk overflowing the stack.
let _ = tx.send(tx_err.0);
}
}
}
impl<E: EthSpec> BackgroundMigrator<E> {
/// Return true if a migration needs to be performed, given a new `finalized_slot`.
fn needs_migration(&self, finalized_slot: Slot, max_finality_distance: u64) -> bool {
let finality_distance = finalized_slot - self.db.get_split_slot();
finality_distance > max_finality_distance
}
/// Spawn a new child thread to run the migration process.
///
/// Return a channel handle for sending new finalized states to the thread.
fn spawn_thread(
db: Arc<DiskStore<E>>,
log: Logger,
) -> (
mpsc::Sender<(
Hash256,
BeaconState<E>,
Arc<HeadTracker>,
SignedBeaconBlockHash,
SignedBeaconBlockHash,
Slot,
)>,
thread::JoinHandle<()>,
) {
let (tx, rx) = mpsc::channel();
let thread = thread::spawn(move || {
while let Ok((
state_root,
state,
head_tracker,
old_finalized_block_hash,
new_finalized_block_hash,
new_finalized_slot,
)) = rx.recv()
{
match DiskStore::process_finalization(db.clone(), state_root, &state) {
Ok(()) => {}
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
debug!(
log,
"Database migration postponed, unaligned finalized block";
"slot" => slot.as_u64()
);
}
Err(e) => {
warn!(
log,
"Database migration failed";
"error" => format!("{:?}", e)
);
}
};
match Self::prune_abandoned_forks(
db.clone(),
head_tracker,
old_finalized_block_hash,
new_finalized_block_hash,
new_finalized_slot,
) {
Ok(()) => {}
Err(e) => warn!(log, "Block pruning failed: {:?}", e),
}
}
});
(tx, thread)
}
}

View File

@@ -1,6 +1,7 @@
pub use crate::beacon_chain::{
BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY,
};
use crate::migrate::{BlockingMigrator, Migrate, NullMigrator};
pub use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::{
builder::{BeaconChainBuilder, Witness},
@@ -14,16 +15,16 @@ use sloggers::{null::NullLoggerBuilder, Build};
use slot_clock::TestingSlotClock;
use state_processing::per_slot_processing;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use store::{
migrate::{BlockingMigrator, NullMigrator},
DiskStore, MemoryStore, Migrate, Store,
};
use store::{DiskStore, MemoryStore, Store};
use tempfile::{tempdir, TempDir};
use tree_hash::TreeHash;
use types::{
AggregateSignature, Attestation, BeaconState, ChainSpec, Domain, EthSpec, Hash256, Keypair,
SecretKey, Signature, SignedBeaconBlock, SignedRoot, Slot,
AggregateSignature, Attestation, BeaconState, BeaconStateHash, ChainSpec, Domain, EthSpec,
Hash256, Keypair, SecretKey, Signature, SignedBeaconBlock, SignedBeaconBlockHash, SignedRoot,
Slot,
};
pub use types::test_utils::generate_deterministic_keypairs;
@@ -135,7 +136,10 @@ impl<E: EthSpec> BeaconChainHarness<DiskHarnessType<E>> {
.logger(log.clone())
.custom_spec(spec.clone())
.store(store.clone())
.store_migrator(<BlockingMigrator<_> as Migrate<_, E>>::new(store))
.store_migrator(<BlockingMigrator<_> as Migrate<_, E>>::new(
store,
log.clone(),
))
.data_dir(data_dir.path().to_path_buf())
.genesis_state(
interop_genesis_state::<E>(&keypairs, HARNESS_GENESIS_TIME, &spec)
@@ -175,7 +179,10 @@ impl<E: EthSpec> BeaconChainHarness<DiskHarnessType<E>> {
.logger(log.clone())
.custom_spec(spec)
.store(store.clone())
.store_migrator(<BlockingMigrator<_> as Migrate<_, E>>::new(store))
.store_migrator(<BlockingMigrator<_> as Migrate<_, E>>::new(
store,
log.clone(),
))
.data_dir(data_dir.path().to_path_buf())
.resume_from_db()
.expect("should resume beacon chain from db")
@@ -272,6 +279,123 @@ where
head_block_root.expect("did not produce any blocks")
}
/// Returns current canonical head slot
pub fn get_chain_slot(&self) -> Slot {
self.chain.slot().unwrap()
}
/// Returns current canonical head state
pub fn get_head_state(&self) -> BeaconState<E> {
self.chain.head().unwrap().beacon_state
}
/// Adds a single block (synchronously) onto either the canonical chain (block_strategy ==
/// OnCanonicalHead) or a fork (block_strategy == ForkCanonicalChainAt).
pub fn add_block(
&self,
state: &BeaconState<E>,
block_strategy: BlockStrategy,
slot: Slot,
validators: &[usize],
) -> (SignedBeaconBlockHash, BeaconState<E>) {
while self.chain.slot().expect("should have a slot") < slot {
self.advance_slot();
}
let (block, new_state) = self.build_block(state.clone(), slot, block_strategy);
let block_root = self
.chain
.process_block(block)
.expect("should not error during block processing");
self.chain.fork_choice().expect("should find head");
let attestation_strategy = AttestationStrategy::SomeValidators(validators.to_vec());
self.add_free_attestations(&attestation_strategy, &new_state, block_root, slot);
(block_root.into(), new_state)
}
/// `add_block()` repeated `num_blocks` times.
pub fn add_blocks(
&self,
mut state: BeaconState<E>,
mut slot: Slot,
num_blocks: usize,
attesting_validators: &[usize],
block_strategy: BlockStrategy,
) -> (
HashMap<Slot, SignedBeaconBlockHash>,
HashMap<Slot, BeaconStateHash>,
Slot,
SignedBeaconBlockHash,
BeaconState<E>,
) {
let mut blocks: HashMap<Slot, SignedBeaconBlockHash> = HashMap::with_capacity(num_blocks);
let mut states: HashMap<Slot, BeaconStateHash> = HashMap::with_capacity(num_blocks);
for _ in 0..num_blocks {
let (new_root_hash, new_state) =
self.add_block(&state, block_strategy, slot, attesting_validators);
blocks.insert(slot, new_root_hash);
states.insert(slot, new_state.tree_hash_root().into());
state = new_state;
slot += 1;
}
let head_hash = blocks[&(slot - 1)];
(blocks, states, slot, head_hash, state)
}
/// A wrapper on `add_blocks()` to avoid passing enums explicitly.
pub fn add_canonical_chain_blocks(
&self,
state: BeaconState<E>,
slot: Slot,
num_blocks: usize,
attesting_validators: &[usize],
) -> (
HashMap<Slot, SignedBeaconBlockHash>,
HashMap<Slot, BeaconStateHash>,
Slot,
SignedBeaconBlockHash,
BeaconState<E>,
) {
let block_strategy = BlockStrategy::OnCanonicalHead;
self.add_blocks(
state,
slot,
num_blocks,
attesting_validators,
block_strategy,
)
}
/// A wrapper on `add_blocks()` to avoid passing enums explicitly.
pub fn add_stray_blocks(
&self,
state: BeaconState<E>,
slot: Slot,
num_blocks: usize,
attesting_validators: &[usize],
) -> (
HashMap<Slot, SignedBeaconBlockHash>,
HashMap<Slot, BeaconStateHash>,
Slot,
SignedBeaconBlockHash,
BeaconState<E>,
) {
let block_strategy = BlockStrategy::ForkCanonicalChainAt {
previous_slot: slot,
first_slot: slot + 2,
};
self.add_blocks(
state,
slot + 2,
num_blocks,
attesting_validators,
block_strategy,
)
}
/// Returns a newly created block, signed by the proposer for the given slot.
fn build_block(
&self,
@@ -347,7 +471,9 @@ where
.process_attestation(attestation, AttestationType::Aggregated)
.expect("should not error during attestation processing")
{
AttestationProcessingOutcome::Processed => (),
// PastEpoch can occur if we fork over several epochs
AttestationProcessingOutcome::Processed
| AttestationProcessingOutcome::PastEpoch { .. } => (),
other => panic!("did not successfully process attestation: {:?}", other),
}
});