Implement checkpoint sync (#2244)

## Issue Addressed

Closes #1891
Closes #1784

## Proposed Changes

Implement checkpoint sync for Lighthouse, enabling it to start from a weak subjectivity checkpoint.

## Additional Info

- [x] Return unavailable status for out-of-range blocks requested by peers (#2561)
- [x] Implement sync daemon for fetching historical blocks (#2561)
- [x] Verify chain hashes (either in `historical_blocks.rs` or the calling module)
- [x] Consistency check for initial block + state
- [x] Fetch the initial state and block from a beacon node HTTP endpoint
- [x] Don't crash fetching beacon states by slot from the API
- [x] Background service for state reconstruction, triggered by CLI flag or API call.

Considered out of scope for this PR:

- Drop the requirement to provide the `--checkpoint-block` (this would require some pretty heavy refactoring of block verification)


Co-authored-by: Diva M <divma@protonmail.com>
This commit is contained in:
Michael Sproul
2021-09-22 00:37:28 +00:00
parent 280e4fe23d
commit 9667dc2f03
71 changed files with 4012 additions and 459 deletions

View File

@@ -14,6 +14,7 @@ use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
use crate::events::ServerSentEventHandler;
use crate::head_tracker::HeadTracker;
use crate::historical_blocks::HistoricalBlockError;
use crate::migrate::BackgroundMigrator;
use crate::naive_aggregation_pool::{
AggregatedAttestationMap, Error as NaiveAggregationError, NaiveAggregationPool,
@@ -431,10 +432,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// - Skipped slots contain the root of the closest prior
/// non-skipped slot (identical to the way they are stored in `state.block_roots`).
/// - Iterator returns `(Hash256, Slot)`.
///
/// Will return a `BlockOutOfRange` error if the requested start slot is before the period of
/// history for which we have blocks stored. See `get_oldest_block_slot`.
pub fn forwards_iter_block_roots(
&self,
start_slot: Slot,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
let oldest_block_slot = self.store.get_oldest_block_slot();
if start_slot < oldest_block_slot {
return Err(Error::HistoricalBlockError(
HistoricalBlockError::BlockOutOfRange {
slot: start_slot,
oldest_block_slot,
},
));
}
let local_head = self.head()?;
let iter = HotColdDB::forwards_block_roots_iterator(
@@ -620,6 +634,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(Some(self.genesis_state_root));
}
// Check limits w.r.t historic state bounds.
let (historic_lower_limit, historic_upper_limit) = self.store.get_historic_state_limits();
if request_slot > historic_lower_limit && request_slot < historic_upper_limit {
return Ok(None);
}
// Try an optimized path of reading the root directly from the head state.
let fast_lookup: Option<Hash256> = self.with_head(|head| {
if head.beacon_block.slot() <= request_slot {
@@ -657,7 +677,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// ## Notes
///
/// - Use the `skips` parameter to define the behaviour when `request_slot` is a skipped slot.
/// - Returns `Ok(None)` for any slot higher than the current wall-clock slot.
/// - Returns `Ok(None)` for any slot higher than the current wall-clock slot, or less than
/// the oldest known block slot.
pub fn block_root_at_slot(
&self,
request_slot: Slot,
@@ -667,6 +688,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
WhenSlotSkipped::None => self.block_root_at_slot_skips_none(request_slot),
WhenSlotSkipped::Prev => self.block_root_at_slot_skips_prev(request_slot),
}
.or_else(|e| match e {
Error::HistoricalBlockError(_) => Ok(None),
e => Err(e),
})
}
/// Returns the block root at the given slot, if any. Only returns roots in the canonical chain.

View File

@@ -501,6 +501,9 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
let block_root = get_block_root(&block);
// Disallow blocks that conflict with the anchor (weak subjectivity checkpoint), if any.
check_block_against_anchor_slot(block.message(), chain)?;
// Do not gossip a block from a finalized slot.
check_block_against_finalized_slot(block.message(), chain)?;
@@ -708,6 +711,9 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
.fork_name(&chain.spec)
.map_err(BlockError::InconsistentFork)?;
// Check the anchor slot before loading the parent, to avoid spurious lookups.
check_block_against_anchor_slot(block.message(), chain)?;
let (mut parent, block) = load_parent(block, chain)?;
// Reject any block that exceeds our limit on skipped slots.
@@ -1115,6 +1121,19 @@ fn check_block_skip_slots<T: BeaconChainTypes>(
Ok(())
}
/// Returns `Ok(())` if the block's slot is greater than the anchor block's slot (if any).
fn check_block_against_anchor_slot<T: BeaconChainTypes>(
block: BeaconBlockRef<'_, T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<(), BlockError<T::EthSpec>> {
if let Some(anchor_slot) = chain.store.get_anchor_slot() {
if block.slot() <= anchor_slot {
return Err(BlockError::WeakSubjectivityConflict);
}
}
Ok(())
}
/// Returns `Ok(())` if the block is later than the finalized slot on `chain`.
///
/// Returns an error if the block is earlier or equal to the finalized slot, or there was an error

View File

@@ -28,8 +28,8 @@ use std::time::Duration;
use store::{Error as StoreError, HotColdDB, ItemStore};
use task_executor::ShutdownReason;
use types::{
BeaconBlock, BeaconState, ChainSpec, EthSpec, Graffiti, Hash256, PublicKeyBytes, Signature,
SignedBeaconBlock, Slot,
BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes,
Signature, SignedBeaconBlock, Slot,
};
/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
@@ -282,12 +282,19 @@ where
Ok(self)
}
/// Starts a new chain from a genesis state.
pub fn genesis_state(
/// Store the genesis state & block in the DB.
///
/// Do *not* initialize fork choice, or do anything that assumes starting from genesis.
///
/// Return the `BeaconSnapshot` representing genesis as well as the mutated builder.
fn set_genesis_state(
mut self,
mut beacon_state: BeaconState<TEthSpec>,
) -> Result<Self, String> {
let store = self.store.clone().ok_or("genesis_state requires a store")?;
) -> Result<(BeaconSnapshot<TEthSpec>, Self), String> {
let store = self
.store
.clone()
.ok_or("set_genesis_state requires a store")?;
let beacon_block = genesis_block(&mut beacon_state, &self.spec)?;
@@ -298,9 +305,6 @@ where
let beacon_state_root = beacon_block.message().state_root();
let beacon_block_root = beacon_block.canonical_root();
self.genesis_state_root = Some(beacon_state_root);
self.genesis_block_root = Some(beacon_block_root);
store
.put_state(&beacon_state_root, &beacon_state)
.map_err(|e| format!("Failed to store genesis state: {:?}", e))?;
@@ -318,11 +322,26 @@ where
)
})?;
let genesis = BeaconSnapshot {
beacon_block,
beacon_block_root,
beacon_state,
};
self.genesis_state_root = Some(beacon_state_root);
self.genesis_block_root = Some(beacon_block_root);
self.genesis_time = Some(beacon_state.genesis_time());
Ok((
BeaconSnapshot {
beacon_block_root,
beacon_block,
beacon_state,
},
self,
))
}
/// Starts a new chain from a genesis state.
pub fn genesis_state(mut self, beacon_state: BeaconState<TEthSpec>) -> Result<Self, String> {
let store = self.store.clone().ok_or("genesis_state requires a store")?;
let (genesis, updated_builder) = self.set_genesis_state(beacon_state)?;
self = updated_builder;
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis);
@@ -332,10 +351,115 @@ where
&genesis.beacon_block,
&genesis.beacon_state,
)
.map_err(|e| format!("Unable to build initialize ForkChoice: {:?}", e))?;
.map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?;
self.fork_choice = Some(fork_choice);
Ok(self.empty_op_pool())
}
/// Start the chain from a weak subjectivity state.
pub fn weak_subjectivity_state(
mut self,
mut weak_subj_state: BeaconState<TEthSpec>,
weak_subj_block: SignedBeaconBlock<TEthSpec>,
genesis_state: BeaconState<TEthSpec>,
) -> Result<Self, String> {
let store = self.store.clone().ok_or("genesis_state requires a store")?;
let weak_subj_slot = weak_subj_state.slot();
let weak_subj_block_root = weak_subj_block.canonical_root();
let weak_subj_state_root = weak_subj_block.state_root();
// Check that the given block lies on an epoch boundary. Due to the database only storing
// full states on epoch boundaries and at restore points it would be difficult to support
// starting from a mid-epoch state.
if weak_subj_slot % TEthSpec::slots_per_epoch() != 0 {
return Err(format!(
"Checkpoint block at slot {} is not aligned to epoch start. \
Please supply an aligned checkpoint with block.slot % 32 == 0",
weak_subj_block.slot(),
));
}
// Check that the block and state have consistent slots and state roots.
if weak_subj_state.slot() != weak_subj_block.slot() {
return Err(format!(
"Slot of snapshot block ({}) does not match snapshot state ({})",
weak_subj_block.slot(),
weak_subj_state.slot(),
));
}
let computed_state_root = weak_subj_state
.update_tree_hash_cache()
.map_err(|e| format!("Error computing checkpoint state root: {:?}", e))?;
if weak_subj_state_root != computed_state_root {
return Err(format!(
"Snapshot state root does not match block, expected: {:?}, got: {:?}",
weak_subj_state_root, computed_state_root
));
}
// Check that the checkpoint state is for the same network as the genesis state.
// This check doesn't do much for security but should prevent mistakes.
if weak_subj_state.genesis_validators_root() != genesis_state.genesis_validators_root() {
return Err(format!(
"Snapshot state appears to be from the wrong network. Genesis validators root \
is {:?} but should be {:?}",
weak_subj_state.genesis_validators_root(),
genesis_state.genesis_validators_root()
));
}
// Set the store's split point *before* storing genesis so that genesis is stored
// immediately in the freezer DB.
store.set_split(weak_subj_slot, weak_subj_state_root);
store
.store_split()
.map_err(|e| format!("Error storing DB split point: {:?}", e))?;
let (_, updated_builder) = self.set_genesis_state(genesis_state)?;
self = updated_builder;
store
.put_state(&weak_subj_state_root, &weak_subj_state)
.map_err(|e| format!("Failed to store weak subjectivity state: {:?}", e))?;
store
.put_block(&weak_subj_block_root, weak_subj_block.clone())
.map_err(|e| format!("Failed to store weak subjectivity block: {:?}", e))?;
// Store anchor info (context for weak subj sync).
store
.init_anchor_info(weak_subj_block.message())
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?;
// Store pruning checkpoint to prevent attempting to prune before the anchor state.
store
.store_pruning_checkpoint(Checkpoint {
root: weak_subj_block_root,
epoch: weak_subj_state.slot().epoch(TEthSpec::slots_per_epoch()),
})
.map_err(|e| format!("Failed to write pruning checkpoint: {:?}", e))?;
let snapshot = BeaconSnapshot {
beacon_block_root: weak_subj_block_root,
beacon_block: weak_subj_block,
beacon_state: weak_subj_state,
};
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &snapshot);
let fork_choice = ForkChoice::from_anchor(
fc_store,
snapshot.beacon_block_root,
&snapshot.beacon_block,
&snapshot.beacon_state,
)
.map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?;
self.fork_choice = Some(fork_choice);
self.genesis_time = Some(genesis.beacon_state.genesis_time());
Ok(self.empty_op_pool())
}
@@ -520,12 +644,13 @@ where
let fc_finalized = fork_choice.finalized_checkpoint();
let head_finalized = canonical_head.beacon_state.finalized_checkpoint();
if fc_finalized != head_finalized {
if head_finalized.root == Hash256::zero()
let is_genesis = head_finalized.root.is_zero()
&& head_finalized.epoch == fc_finalized.epoch
&& fc_finalized.root == genesis_block_root
{
// This is a legal edge-case encountered during genesis.
} else {
&& fc_finalized.root == genesis_block_root;
let is_wss = store.get_anchor_slot().map_or(false, |anchor_slot| {
fc_finalized.epoch == anchor_slot.epoch(TEthSpec::slots_per_epoch())
});
if !is_genesis && !is_wss {
return Err(format!(
"Database corrupt: fork choice is finalized at {:?} whilst head is finalized at \
{:?}",
@@ -654,6 +779,11 @@ where
"head_slot" => format!("{}", head.beacon_block.slot()),
);
// Check for states to reconstruct (in the background).
if beacon_chain.config.reconstruct_historic_states {
beacon_chain.store_migrator.process_reconstruction();
}
Ok(beacon_chain)
}
}

View File

@@ -12,6 +12,8 @@ pub struct ChainConfig {
///
/// If `None`, there is no weak subjectivity verification.
pub weak_subjectivity_checkpoint: Option<Checkpoint>,
/// Determine whether to reconstruct historic states, usually after a checkpoint sync.
pub reconstruct_historic_states: bool,
}
impl Default for ChainConfig {
@@ -19,6 +21,7 @@ impl Default for ChainConfig {
Self {
import_max_skip_slots: None,
weak_subjectivity_checkpoint: None,
reconstruct_historic_states: false,
}
}
}

View File

@@ -2,6 +2,7 @@ use crate::attester_cache::Error as AttesterCacheError;
use crate::beacon_chain::ForkChoiceError;
use crate::beacon_fork_choice_store::Error as ForkChoiceStoreError;
use crate::eth1_chain::Error as Eth1ChainError;
use crate::historical_blocks::HistoricalBlockError;
use crate::migrate::PruningError;
use crate::naive_aggregation_pool::Error as NaiveAggregationError;
use crate::observed_aggregates::Error as ObservedAttestationsError;
@@ -117,6 +118,7 @@ pub enum BeaconChainError {
block_slot: Slot,
state_slot: Slot,
},
HistoricalBlockError(HistoricalBlockError),
InvalidStateForShuffling {
state_epoch: Epoch,
shuffling_epoch: Epoch,
@@ -150,6 +152,7 @@ easy_from_to!(BlockSignatureVerifierError, BeaconChainError);
easy_from_to!(PruningError, BeaconChainError);
easy_from_to!(ArithError, BeaconChainError);
easy_from_to!(ForkChoiceStoreError, BeaconChainError);
easy_from_to!(HistoricalBlockError, BeaconChainError);
easy_from_to!(StateAdvanceError, BeaconChainError);
#[derive(Debug)]

View File

@@ -0,0 +1,195 @@
use crate::{errors::BeaconChainError as Error, BeaconChain, BeaconChainTypes};
use itertools::Itertools;
use slog::debug;
use state_processing::{
per_block_processing::ParallelSignatureSets,
signature_sets::{block_proposal_signature_set_from_parts, Error as SignatureSetError},
};
use std::borrow::Cow;
use std::iter;
use std::time::Duration;
use store::{chunked_vector::BlockRoots, AnchorInfo, ChunkWriter, KeyValueStore};
use types::{Hash256, SignedBeaconBlock, Slot};
/// Use a longer timeout on the pubkey cache.
///
/// It's ok if historical sync is stalled due to writes from forwards block processing.
const PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug)]
pub enum HistoricalBlockError {
/// Block is not available (only returned when fetching historic blocks).
BlockOutOfRange { slot: Slot, oldest_block_slot: Slot },
/// Block root mismatch, caller should retry with different blocks.
MismatchedBlockRoot {
block_root: Hash256,
expected_block_root: Hash256,
},
/// Bad signature, caller should retry with different blocks.
SignatureSet(SignatureSetError),
/// Bad signature, caller should retry with different blocks.
InvalidSignature,
/// Transitory error, caller should retry with the same blocks.
ValidatorPubkeyCacheTimeout,
/// No historical sync needed.
NoAnchorInfo,
/// Logic error: should never occur.
IndexOutOfBounds,
}
impl<T: BeaconChainTypes> BeaconChain<T> {
/// Store a batch of historical blocks in the database.
///
/// The `blocks` should be given in slot-ascending order. One of the blocks should have a block
/// root corresponding to the `oldest_block_parent` from the store's `AnchorInfo`.
///
/// The block roots and proposer signatures are verified. If any block doesn't match the parent
/// root listed in its successor, then the whole batch will be discarded and
/// `MismatchedBlockRoot` will be returned. If any proposer signature is invalid then
/// `SignatureSetError` or `InvalidSignature` will be returned.
///
/// To align with sync we allow some excess blocks with slots greater than or equal to
/// `oldest_block_slot` to be provided. They will be ignored without being checked.
///
/// This function should not be called concurrently with any other function that mutates
/// the anchor info (including this function itself). If a concurrent mutation occurs that
/// would violate consistency then an `AnchorInfoConcurrentMutation` error will be returned.
///
/// Return the number of blocks successfully imported.
pub fn import_historical_block_batch(
&self,
blocks: &[SignedBeaconBlock<T::EthSpec>],
) -> Result<usize, Error> {
let anchor_info = self
.store
.get_anchor_info()
.ok_or(HistoricalBlockError::NoAnchorInfo)?;
// Take all blocks with slots less than the oldest block slot.
let num_relevant =
blocks.partition_point(|block| block.slot() < anchor_info.oldest_block_slot);
let blocks_to_import = &blocks
.get(..num_relevant)
.ok_or(HistoricalBlockError::IndexOutOfBounds)?;
if blocks_to_import.len() != blocks.len() {
debug!(
self.log,
"Ignoring some historic blocks";
"oldest_block_slot" => anchor_info.oldest_block_slot,
"total_blocks" => blocks.len(),
"ignored" => blocks.len().saturating_sub(blocks_to_import.len()),
);
}
if blocks_to_import.is_empty() {
return Ok(0);
}
let mut expected_block_root = anchor_info.oldest_block_parent;
let mut prev_block_slot = anchor_info.oldest_block_slot;
let mut chunk_writer =
ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
let mut cold_batch = Vec::with_capacity(blocks.len());
let mut hot_batch = Vec::with_capacity(blocks.len());
for block in blocks_to_import.iter().rev() {
// Check chain integrity.
let block_root = block.canonical_root();
if block_root != expected_block_root {
return Err(HistoricalBlockError::MismatchedBlockRoot {
block_root,
expected_block_root,
}
.into());
}
// Store block in the hot database.
hot_batch.push(self.store.block_as_kv_store_op(&block_root, block));
// Store block roots, including at all skip slots in the freezer DB.
for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() {
chunk_writer.set(slot, block_root, &mut cold_batch)?;
}
prev_block_slot = block.slot();
expected_block_root = block.message().parent_root();
// If we've reached genesis, add the genesis block root to the batch and set the
// anchor slot to 0 to indicate completion.
if expected_block_root == self.genesis_block_root {
let genesis_slot = self.spec.genesis_slot;
chunk_writer.set(
genesis_slot.as_usize(),
self.genesis_block_root,
&mut cold_batch,
)?;
prev_block_slot = genesis_slot;
expected_block_root = Hash256::zero();
break;
}
}
chunk_writer.write(&mut cold_batch)?;
// Verify signatures in one batch, holding the pubkey cache lock for the shortest duration
// possible. For each block fetch the parent root from its successor. Slicing from index 1
// is safe because we've already checked that `blocks_to_import` is non-empty.
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(HistoricalBlockError::ValidatorPubkeyCacheTimeout)?;
let block_roots = blocks_to_import
.get(1..)
.ok_or(HistoricalBlockError::IndexOutOfBounds)?
.iter()
.map(|block| block.parent_root())
.chain(iter::once(anchor_info.oldest_block_parent));
let signature_set = blocks_to_import
.iter()
.zip_eq(block_roots)
.map(|(block, block_root)| {
block_proposal_signature_set_from_parts(
block,
Some(block_root),
block.message().proposer_index(),
&self.spec.fork_at_epoch(block.message().epoch()),
self.genesis_validators_root,
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
&self.spec,
)
})
.collect::<Result<Vec<_>, _>>()
.map_err(HistoricalBlockError::SignatureSet)
.map(ParallelSignatureSets::from)?;
if !signature_set.verify() {
return Err(HistoricalBlockError::InvalidSignature.into());
}
drop(pubkey_cache);
// Write the I/O batches to disk, writing the blocks themselves first, as it's better
// for the hot DB to contain extra blocks than for the cold DB to point to blocks that
// do not exist.
self.store.hot_db.do_atomically(hot_batch)?;
self.store.cold_db.do_atomically(cold_batch)?;
// Update the anchor.
let new_anchor = AnchorInfo {
oldest_block_slot: prev_block_slot,
oldest_block_parent: expected_block_root,
..anchor_info
};
let backfill_complete = new_anchor.block_backfill_complete();
self.store
.compare_and_set_anchor_info(Some(anchor_info), Some(new_anchor))?;
// If backfill has completed and the chain is configured to reconstruct historic states,
// send a message to the background migrator instructing it to begin reconstruction.
if backfill_complete && self.config.reconstruct_historic_states {
self.store_migrator.process_reconstruction();
}
Ok(blocks_to_import.len())
}
}

View File

@@ -13,6 +13,7 @@ pub mod eth1_chain;
pub mod events;
pub mod fork_revert;
mod head_tracker;
pub mod historical_blocks;
mod metrics;
pub mod migrate;
mod naive_aggregation_pool;
@@ -39,6 +40,7 @@ pub use self::beacon_chain::{
pub use self::beacon_snapshot::BeaconSnapshot;
pub use self::chain_config::ChainConfig;
pub use self::errors::{BeaconChainError, BlockProductionError};
pub use self::historical_blocks::HistoricalBlockError;
pub use attestation_verification::Error as AttestationError;
pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError};
pub use block_verification::{BlockError, GossipVerifiedBlock};

View File

@@ -30,7 +30,7 @@ const COMPACTION_FINALITY_DISTANCE: u64 = 1024;
pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
db: Arc<HotColdDB<E, Hot, Cold>>,
#[allow(clippy::type_complexity)]
tx_thread: Option<Mutex<(mpsc::Sender<MigrationNotification>, thread::JoinHandle<()>)>>,
tx_thread: Option<Mutex<(mpsc::Sender<Notification>, thread::JoinHandle<()>)>>,
/// Genesis block root, for persisting the `PersistedBeaconChain`.
genesis_block_root: Hash256,
log: Logger,
@@ -73,7 +73,12 @@ pub enum PruningError {
}
/// Message sent to the migration thread containing the information it needs to run.
pub struct MigrationNotification {
pub enum Notification {
Finalization(FinalizationNotification),
Reconstruction,
}
pub struct FinalizationNotification {
finalized_state_root: BeaconStateHash,
finalized_checkpoint: Checkpoint,
head_tracker: Arc<HeadTracker>,
@@ -112,13 +117,46 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
finalized_checkpoint: Checkpoint,
head_tracker: Arc<HeadTracker>,
) -> Result<(), BeaconChainError> {
let notif = MigrationNotification {
let notif = FinalizationNotification {
finalized_state_root,
finalized_checkpoint,
head_tracker,
genesis_block_root: self.genesis_block_root,
};
// Send to background thread if configured, otherwise run in foreground.
if let Some(Notification::Finalization(notif)) =
self.send_background_notification(Notification::Finalization(notif))
{
Self::run_migration(self.db.clone(), notif, &self.log);
}
Ok(())
}
pub fn process_reconstruction(&self) {
if let Some(Notification::Reconstruction) =
self.send_background_notification(Notification::Reconstruction)
{
Self::run_reconstruction(self.db.clone(), &self.log);
}
}
pub fn run_reconstruction(db: Arc<HotColdDB<E, Hot, Cold>>, log: &Logger) {
if let Err(e) = db.reconstruct_historic_states() {
error!(
log,
"State reconstruction failed";
"error" => ?e,
);
}
}
/// If configured to run in the background, send `notif` to the background thread.
///
/// Return `None` if the message was sent to the background thread, `Some(notif)` otherwise.
#[must_use = "Message is not processed when this function returns `Some`"]
fn send_background_notification(&self, notif: Notification) -> Option<Notification> {
// Async path, on the background thread.
if let Some(tx_thread) = &self.tx_thread {
let (ref mut tx, ref mut thread) = *tx_thread.lock();
@@ -143,17 +181,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
// Retry at most once, we could recurse but that would risk overflowing the stack.
let _ = tx.send(tx_err.0);
}
}
None
// Synchronous path, on the current thread.
else {
Self::run_migration(self.db.clone(), notif, &self.log)
} else {
Some(notif)
}
Ok(())
}
/// Perform the actual work of `process_finalization`.
fn run_migration(db: Arc<HotColdDB<E, Hot, Cold>>, notif: MigrationNotification, log: &Logger) {
fn run_migration(
db: Arc<HotColdDB<E, Hot, Cold>>,
notif: FinalizationNotification,
log: &Logger,
) {
debug!(log, "Database consolidation started");
let finalized_state_root = notif.finalized_state_root;
let finalized_state = match db.get_state(&finalized_state_root.into(), None) {
@@ -223,31 +265,44 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
) {
warn!(log, "Database compaction failed"; "error" => format!("{:?}", e));
}
debug!(log, "Database consolidation complete");
}
/// Spawn a new child thread to run the migration process.
///
/// Return a channel handle for sending new finalized states to the thread.
/// Return a channel handle for sending requests to the thread.
fn spawn_thread(
db: Arc<HotColdDB<E, Hot, Cold>>,
log: Logger,
) -> (mpsc::Sender<MigrationNotification>, thread::JoinHandle<()>) {
) -> (mpsc::Sender<Notification>, thread::JoinHandle<()>) {
let (tx, rx) = mpsc::channel();
let thread = thread::spawn(move || {
while let Ok(notif) = rx.recv() {
// Read the rest of the messages in the channel, ultimately choosing the `notif`
// with the highest finalized epoch.
let notif = rx
.try_iter()
.fold(notif, |best, other: MigrationNotification| {
if other.finalized_checkpoint.epoch > best.finalized_checkpoint.epoch {
other
} else {
best
}
});
// Read the rest of the messages in the channel, preferring any reconstruction
// notification, or the finalization notification with the greatest finalized epoch.
let notif =
rx.try_iter()
.fold(notif, |best, other: Notification| match (&best, &other) {
(Notification::Reconstruction, _)
| (_, Notification::Reconstruction) => Notification::Reconstruction,
(
Notification::Finalization(fin1),
Notification::Finalization(fin2),
) => {
if fin2.finalized_checkpoint.epoch > fin1.finalized_checkpoint.epoch
{
other
} else {
best
}
}
});
Self::run_migration(db.clone(), notif, &log);
match notif {
Notification::Reconstruction => Self::run_reconstruction(db.clone(), &log),
Notification::Finalization(fin) => Self::run_migration(db.clone(), fin, &log),
}
}
});
(tx, thread)

View File

@@ -2,12 +2,15 @@
use crate::beacon_chain::{BeaconChainTypes, OP_POOL_DB_KEY};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use operation_pool::{PersistedOperationPool, PersistedOperationPoolBase};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::fs;
use std::path::Path;
use std::sync::Arc;
use store::config::OnDiskStoreConfig;
use store::hot_cold_store::{HotColdDB, HotColdDBError};
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::Error as StoreError;
use store::metadata::{SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION};
use store::{DBColumn, Error as StoreError, ItemStore, StoreItem};
const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz";
@@ -73,6 +76,23 @@ pub fn migrate_schema<T: BeaconChainTypes>(
Ok(())
}
// Migration for weak subjectivity sync support and clean up of `OnDiskStoreConfig` (#1784).
(SchemaVersion(4), SchemaVersion(5)) => {
if let Some(OnDiskStoreConfigV4 {
slots_per_restore_point,
..
}) = db.hot_db.get(&CONFIG_KEY)?
{
let new_config = OnDiskStoreConfig {
slots_per_restore_point,
};
db.hot_db.put(&CONFIG_KEY, &new_config)?;
}
db.store_schema_version(to)?;
Ok(())
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
@@ -81,3 +101,24 @@ pub fn migrate_schema<T: BeaconChainTypes>(
.into()),
}
}
// Store config used in v4 schema and earlier.
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct OnDiskStoreConfigV4 {
pub slots_per_restore_point: u64,
pub _block_cache_size: usize,
}
impl StoreItem for OnDiskStoreConfigV4 {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}

View File

@@ -12,7 +12,7 @@ use crate::{
};
use bls::get_withdrawal_credentials;
use futures::channel::mpsc::Receiver;
use genesis::interop_genesis_state;
pub use genesis::interop_genesis_state;
use int_to_bytes::int_to_bytes32;
use merkle_proof::MerkleTree;
use parking_lot::Mutex;

View File

@@ -1,11 +1,16 @@
#![cfg(not(debug_assertions))]
use beacon_chain::attestation_verification::Error as AttnError;
use beacon_chain::builder::BeaconChainBuilder;
use beacon_chain::test_utils::{
test_logger, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy,
DiskHarnessType, HARNESS_SLOT_TIME,
};
use beacon_chain::{BeaconChain, BeaconChainTypes, BeaconSnapshot, ChainConfig};
use beacon_chain::{
historical_blocks::HistoricalBlockError, migrate::MigratorConfig, BeaconChain,
BeaconChainError, BeaconChainTypes, BeaconSnapshot, ChainConfig, ServerSentEventHandler,
WhenSlotSkipped,
};
use lazy_static::lazy_static;
use maplit::hashset;
use rand::Rng;
@@ -558,7 +563,7 @@ fn multiple_attestations_per_block() {
let harness = get_harness(store, HIGH_VALIDATOR_COUNT);
harness.extend_chain(
MainnetEthSpec::slots_per_epoch() as usize * 3,
E::slots_per_epoch() as usize * 3,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
);
@@ -1741,6 +1746,173 @@ fn garbage_collect_temp_states_from_failed_block() {
assert_eq!(store.iter_temporary_state_roots().count(), 0);
}
#[test]
fn weak_subjectivity_sync() {
// Build an initial chain on one harness, representing a synced node with full history.
let num_initial_blocks = E::slots_per_epoch() * 11;
let num_final_blocks = E::slots_per_epoch() * 2;
let temp1 = tempdir().unwrap();
let full_store = get_store(&temp1);
let harness = get_harness(full_store.clone(), LOW_VALIDATOR_COUNT);
harness.extend_chain(
num_initial_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
);
let genesis_state = full_store
.get_state(&harness.chain.genesis_state_root, Some(Slot::new(0)))
.unwrap()
.unwrap();
let wss_checkpoint = harness.chain.head_info().unwrap().finalized_checkpoint;
let wss_block = harness.get_block(wss_checkpoint.root.into()).unwrap();
let wss_state = full_store
.get_state(&wss_block.state_root(), None)
.unwrap()
.unwrap();
let wss_slot = wss_block.slot();
// Add more blocks that advance finalization further.
harness.advance_slot();
harness.extend_chain(
num_final_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
);
let (shutdown_tx, _shutdown_rx) = futures::channel::mpsc::channel(1);
let log = test_logger();
let temp2 = tempdir().unwrap();
let store = get_store(&temp2);
// Initialise a new beacon chain from the finalized checkpoint
let beacon_chain = BeaconChainBuilder::new(MinimalEthSpec)
.store(store.clone())
.custom_spec(test_spec::<E>())
.weak_subjectivity_state(wss_state, wss_block.clone(), genesis_state)
.unwrap()
.logger(log.clone())
.store_migrator_config(MigratorConfig::default().blocking())
.dummy_eth1_backend()
.expect("should build dummy backend")
.testing_slot_clock(HARNESS_SLOT_TIME)
.expect("should configure testing slot clock")
.shutdown_sender(shutdown_tx)
.chain_config(ChainConfig::default())
.event_handler(Some(ServerSentEventHandler::new_with_capacity(
log.clone(),
1,
)))
.monitor_validators(true, vec![], log)
.build()
.expect("should build");
// Apply blocks forward to reach head.
let chain_dump = harness.chain.chain_dump().unwrap();
let new_blocks = &chain_dump[wss_slot.as_usize() + 1..];
assert_eq!(new_blocks[0].beacon_block.slot(), wss_slot + 1);
for snapshot in new_blocks {
let block = &snapshot.beacon_block;
beacon_chain.slot_clock.set_slot(block.slot().as_u64());
beacon_chain.process_block(block.clone()).unwrap();
beacon_chain.fork_choice().unwrap();
// Check that the new block's state can be loaded correctly.
let state_root = block.state_root();
let mut state = beacon_chain
.store
.get_state(&state_root, Some(block.slot()))
.unwrap()
.unwrap();
assert_eq!(state.update_tree_hash_cache().unwrap(), state_root);
}
// Forwards iterator from 0 should fail as we lack blocks.
assert!(matches!(
beacon_chain.forwards_iter_block_roots(Slot::new(0)),
Err(BeaconChainError::HistoricalBlockError(
HistoricalBlockError::BlockOutOfRange { .. }
))
));
// Simulate processing of a `StatusMessage` with an older finalized epoch by calling
// `block_root_at_slot` with an old slot for which we don't know the block root. It should
// return `None` rather than erroring.
assert_eq!(
beacon_chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap(),
None
);
// Simulate querying the API for a historic state that is unknown. It should also return
// `None` rather than erroring.
assert_eq!(beacon_chain.state_root_at_slot(Slot::new(1)).unwrap(), None);
// Supply blocks backwards to reach genesis. Omit the genesis block to check genesis handling.
let historical_blocks = chain_dump[..wss_block.slot().as_usize()]
.iter()
.filter(|s| s.beacon_block.slot() != 0)
.map(|s| s.beacon_block.clone())
.collect::<Vec<_>>();
beacon_chain
.import_historical_block_batch(&historical_blocks)
.unwrap();
assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0);
// Resupplying the blocks should not fail, they can be safely ignored.
beacon_chain
.import_historical_block_batch(&historical_blocks)
.unwrap();
// The forwards iterator should now match the original chain
let forwards = beacon_chain
.forwards_iter_block_roots(Slot::new(0))
.unwrap()
.map(Result::unwrap)
.collect::<Vec<_>>();
let expected = harness
.chain
.forwards_iter_block_roots(Slot::new(0))
.unwrap()
.map(Result::unwrap)
.collect::<Vec<_>>();
assert_eq!(forwards, expected);
// All blocks can be loaded.
for (block_root, slot) in beacon_chain
.forwards_iter_block_roots(Slot::new(0))
.unwrap()
.map(Result::unwrap)
{
let block = store.get_block(&block_root).unwrap().unwrap();
assert_eq!(block.slot(), slot);
}
// All states from the oldest state slot can be loaded.
let (_, oldest_state_slot) = store.get_historic_state_limits();
for (state_root, slot) in beacon_chain
.forwards_iter_state_roots(oldest_state_slot)
.unwrap()
.map(Result::unwrap)
{
let state = store.get_state(&state_root, Some(slot)).unwrap().unwrap();
assert_eq!(state.slot(), slot);
assert_eq!(state.canonical_root(), state_root);
}
// Anchor slot is still set to the starting slot.
assert_eq!(store.get_anchor_slot(), Some(wss_slot));
// Reconstruct states.
store.clone().reconstruct_historic_states().unwrap();
assert_eq!(store.get_anchor_slot(), None);
}
#[test]
fn finalizes_after_resuming_from_db() {
let validator_count = 16;

View File

@@ -32,6 +32,8 @@ futures = "0.3.7"
reqwest = { version = "0.11.0", features = ["native-tls-vendored"] }
url = "2.1.1"
eth1 = { path = "../eth1" }
eth2 = { path = "../../common/eth2" }
sensitive_url = { path = "../../common/sensitive_url" }
genesis = { path = "../genesis" }
task_executor = { path = "../../common/task_executor" }
environment = { path = "../../lighthouse/environment" }

View File

@@ -12,6 +12,10 @@ use beacon_chain::{
};
use environment::RuntimeContext;
use eth1::{Config as Eth1Config, Service as Eth1Service};
use eth2::{
types::{BlockId, StateId},
BeaconNodeHttpClient, Error as ApiError, Timeouts,
};
use eth2_libp2p::NetworkGlobals;
use genesis::{interop_genesis_state, Eth1GenesisService};
use monitoring_api::{MonitoringHttpClient, ProcessType};
@@ -25,11 +29,16 @@ use std::sync::Arc;
use std::time::Duration;
use timer::spawn_timer;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use types::{test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec};
use types::{
test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec, SignedBeaconBlock,
};
/// Interval between polling the eth1 node for genesis information.
pub const ETH1_GENESIS_UPDATE_INTERVAL_MILLIS: u64 = 7_000;
/// Timeout for checkpoint sync HTTP requests.
pub const CHECKPOINT_SYNC_HTTP_TIMEOUT: Duration = Duration::from_secs(60);
/// Builds a `Client` instance.
///
/// ## Notes
@@ -168,11 +177,22 @@ where
//
// Alternatively, if there's a beacon chain in the database then always resume
// using it.
let client_genesis = if client_genesis == ClientGenesis::FromStore && !chain_exists {
let client_genesis = if matches!(client_genesis, ClientGenesis::FromStore) && !chain_exists
{
info!(context.log(), "Defaulting to deposit contract genesis");
ClientGenesis::DepositContract
} else if chain_exists {
if matches!(client_genesis, ClientGenesis::WeakSubjSszBytes { .. })
|| matches!(client_genesis, ClientGenesis::CheckpointSyncUrl { .. })
{
info!(
context.log(),
"Refusing to checkpoint sync";
"msg" => "database already exists, use --purge-db to force checkpoint sync"
);
}
ClientGenesis::FromStore
} else {
client_genesis
@@ -200,6 +220,103 @@ where
builder.genesis_state(genesis_state).map(|v| (v, None))?
}
ClientGenesis::WeakSubjSszBytes {
anchor_state_bytes,
anchor_block_bytes,
genesis_state_bytes,
} => {
info!(context.log(), "Starting checkpoint sync");
let anchor_state = BeaconState::from_ssz_bytes(&anchor_state_bytes, &spec)
.map_err(|e| format!("Unable to parse weak subj state SSZ: {:?}", e))?;
let anchor_block = SignedBeaconBlock::from_ssz_bytes(&anchor_block_bytes, &spec)
.map_err(|e| format!("Unable to parse weak subj block SSZ: {:?}", e))?;
let genesis_state = BeaconState::from_ssz_bytes(&genesis_state_bytes, &spec)
.map_err(|e| format!("Unable to parse genesis state SSZ: {:?}", e))?;
builder
.weak_subjectivity_state(anchor_state, anchor_block, genesis_state)
.map(|v| (v, None))?
}
ClientGenesis::CheckpointSyncUrl {
genesis_state_bytes,
url,
} => {
info!(
context.log(),
"Starting checkpoint sync";
"remote_url" => %url,
);
let remote =
BeaconNodeHttpClient::new(url, Timeouts::set_all(CHECKPOINT_SYNC_HTTP_TIMEOUT));
let slots_per_epoch = TEthSpec::slots_per_epoch();
// Find a suitable finalized block on an epoch boundary.
let mut block = remote
.get_beacon_blocks_ssz::<TEthSpec>(BlockId::Finalized, &spec)
.await
.map_err(|e| match e {
ApiError::InvalidSsz(e) => format!(
"Unable to parse SSZ: {:?}. Ensure the checkpoint-sync-url refers to a \
node for the correct network",
e
),
e => format!("Error fetching finalized block from remote: {:?}", e),
})?
.ok_or("Finalized block missing from remote, it returned 404")?;
let mut block_slot = block.slot();
while block.slot() % slots_per_epoch != 0 {
block_slot = (block_slot / slots_per_epoch - 1) * slots_per_epoch;
debug!(
context.log(),
"Searching for aligned checkpoint block";
"block_slot" => block_slot,
);
if let Some(found_block) = remote
.get_beacon_blocks_ssz::<TEthSpec>(BlockId::Slot(block_slot), &spec)
.await
.map_err(|e| {
format!("Error fetching block at slot {}: {:?}", block_slot, e)
})?
{
block = found_block;
}
}
let state_root = block.state_root();
let state = remote
.get_debug_beacon_states_ssz::<TEthSpec>(StateId::Root(state_root), &spec)
.await
.map_err(|e| {
format!(
"Error loading checkpoint state from remote {:?}: {:?}",
state_root, e
)
})?
.ok_or_else(|| {
format!("Checkpoint state missing from remote: {:?}", state_root)
})?;
let genesis_state = BeaconState::from_ssz_bytes(&genesis_state_bytes, &spec)
.map_err(|e| format!("Unable to parse genesis state SSZ: {:?}", e))?;
info!(
context.log(),
"Loaded checkpoint block and state";
"slot" => block.slot(),
"block_root" => ?block.canonical_root(),
"state_root" => ?state_root,
);
builder
.weak_subjectivity_state(state, block, genesis_state)
.map(|v| (v, None))?
}
ClientGenesis::DepositContract => {
info!(
context.log(),

View File

@@ -1,5 +1,6 @@
use directory::DEFAULT_ROOT_DIR;
use network::NetworkConfig;
use sensitive_url::SensitiveUrl;
use serde_derive::{Deserialize, Serialize};
use std::fs;
use std::path::PathBuf;
@@ -9,7 +10,7 @@ use types::{Graffiti, PublicKeyBytes};
const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db";
/// Defines how the client should initialize the `BeaconChain` and other components.
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ClientGenesis {
/// Creates a genesis state as per the 2019 Canada interop specifications.
Interop {
@@ -26,6 +27,15 @@ pub enum ClientGenesis {
/// We include the bytes instead of the `BeaconState<E>` because the `EthSpec` type
/// parameter would be very annoying.
SszBytes { genesis_state_bytes: Vec<u8> },
WeakSubjSszBytes {
genesis_state_bytes: Vec<u8>,
anchor_state_bytes: Vec<u8>,
anchor_block_bytes: Vec<u8>,
},
CheckpointSyncUrl {
genesis_state_bytes: Vec<u8>,
url: SensitiveUrl,
},
}
impl Default for ClientGenesis {

View File

@@ -1,6 +1,6 @@
use crate::metrics;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::NetworkGlobals;
use eth2_libp2p::{types::SyncState, NetworkGlobals};
use parking_lot::Mutex;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
@@ -19,6 +19,9 @@ const MINUTES_PER_HOUR: i64 = 60;
/// The number of historical observations that should be used to determine the average sync time.
const SPEEDO_OBSERVATIONS: usize = 4;
/// The number of slots between logs that give detail about backfill process.
const BACKFILL_LOG_INTERVAL: u64 = 5;
/// Spawns a notifier service which periodically logs information about the node.
pub fn spawn_notifier<T: BeaconChainTypes>(
executor: task_executor::TaskExecutor,
@@ -42,6 +45,16 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
let log = executor.log().clone();
let mut interval = tokio::time::interval_at(start_instant, interval_duration);
// Keep track of sync state and reset the speedo on specific sync state changes.
// Specifically, if we switch between a sync and a backfill sync, reset the speedo.
let mut current_sync_state = network.sync_state();
// Store info if we are required to do a backfill sync.
let original_anchor_slot = beacon_chain
.store
.get_anchor_info()
.map(|ai| ai.oldest_block_slot);
let interval_future = async move {
// Perform pre-genesis logging.
loop {
@@ -63,11 +76,30 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
}
// Perform post-genesis logging.
let mut last_backfill_log_slot = None;
loop {
interval.tick().await;
let connected_peer_count = network.connected_peers();
let sync_state = network.sync_state();
// Determine if we have switched syncing chains
if sync_state != current_sync_state {
match (current_sync_state, &sync_state) {
(_, SyncState::BackFillSyncing { .. }) => {
// We have transitioned to a backfill sync. Reset the speedo.
let mut speedo = speedo.lock();
speedo.clear();
}
(SyncState::BackFillSyncing { .. }, _) => {
// We have transitioned from a backfill sync, reset the speedo
let mut speedo = speedo.lock();
speedo.clear();
}
(_, _) => {}
}
current_sync_state = sync_state;
}
let head_info = match beacon_chain.head_info() {
Ok(head_info) => head_info,
Err(e) => {
@@ -97,17 +129,46 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
let finalized_root = head_info.finalized_checkpoint.root;
let head_root = head_info.block_root;
let mut speedo = speedo.lock();
speedo.observe(head_slot, Instant::now());
// The default is for regular sync but this gets modified if backfill sync is in
// progress.
let mut sync_distance = current_slot - head_slot;
let mut speedo = speedo.lock();
match current_sync_state {
SyncState::BackFillSyncing { .. } => {
// Observe backfilling sync info.
if let Some(oldest_slot) = original_anchor_slot {
if let Some(current_anchor_slot) = beacon_chain
.store
.get_anchor_info()
.map(|ai| ai.oldest_block_slot)
{
sync_distance = current_anchor_slot;
speedo
// For backfill sync use a fake slot which is the distance we've progressed from the starting `oldest_block_slot`.
.observe(
oldest_slot.saturating_sub(current_anchor_slot),
Instant::now(),
);
}
}
}
SyncState::SyncingFinalized { .. }
| SyncState::SyncingHead { .. }
| SyncState::SyncTransition => {
speedo.observe(head_slot, Instant::now());
}
SyncState::Stalled | SyncState::Synced => {}
}
// NOTE: This is going to change based on which sync we are currently performing. A
// backfill sync should process slots significantly faster than the other sync
// processes.
metrics::set_gauge(
&metrics::SYNC_SLOTS_PER_SECOND,
speedo.slots_per_second().unwrap_or(0_f64) as i64,
);
// The next two lines take advantage of saturating subtraction on `Slot`.
let head_distance = current_slot - head_slot;
if connected_peer_count <= WARN_PEER_COUNT {
warn!(log, "Low peer count"; "peer_count" => peer_count_pretty(connected_peer_count));
}
@@ -121,16 +182,57 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
"head_block" => format!("{}", head_root),
"head_slot" => head_slot,
"current_slot" => current_slot,
"sync_state" =>format!("{}", sync_state)
"sync_state" =>format!("{}", current_sync_state)
);
// Log if we are backfilling.
let is_backfilling = matches!(current_sync_state, SyncState::BackFillSyncing { .. });
if is_backfilling
&& last_backfill_log_slot
.map_or(true, |slot| slot + BACKFILL_LOG_INTERVAL <= current_slot)
{
last_backfill_log_slot = Some(current_slot);
let distance = format!(
"{} slots ({})",
sync_distance.as_u64(),
slot_distance_pretty(sync_distance, slot_duration)
);
let speed = speedo.slots_per_second();
let display_speed = speed.map_or(false, |speed| speed != 0.0);
if display_speed {
info!(
log,
"Downloading historical blocks";
"distance" => distance,
"speed" => sync_speed_pretty(speed),
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(original_anchor_slot.unwrap_or(current_slot))),
);
} else {
info!(
log,
"Downloading historical blocks";
"distance" => distance,
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(original_anchor_slot.unwrap_or(current_slot))),
);
}
} else if !is_backfilling && last_backfill_log_slot.is_some() {
last_backfill_log_slot = None;
info!(
log,
"Historical block download complete";
);
}
// Log if we are syncing
if sync_state.is_syncing() {
if current_sync_state.is_syncing() {
metrics::set_gauge(&metrics::IS_SYNCED, 0);
let distance = format!(
"{} slots ({})",
head_distance.as_u64(),
slot_distance_pretty(head_distance, slot_duration)
sync_distance.as_u64(),
slot_distance_pretty(sync_distance, slot_duration)
);
let speed = speedo.slots_per_second();
@@ -154,7 +256,7 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(current_slot)),
);
}
} else if sync_state.is_synced() {
} else if current_sync_state.is_synced() {
metrics::set_gauge(&metrics::IS_SYNCED, 1);
let block_info = if current_slot > head_slot {
" … empty".to_string()
@@ -397,4 +499,9 @@ impl Speedo {
None
}
}
/// Clears all past observations to be used for an alternative sync (i.e backfill sync).
pub fn clear(&mut self) {
self.0.clear()
}
}

View File

@@ -522,7 +522,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
}
/// Inform the peer that their request produced an error.
pub fn _send_error_reponse(
pub fn send_error_reponse(
&mut self,
peer_id: PeerId,
id: PeerRequestId,

View File

@@ -578,8 +578,17 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
RPCResponseErrorCode::Unknown => PeerAction::HighToleranceError,
RPCResponseErrorCode::ResourceUnavailable => {
// NOTE: This error only makes sense for the `BlocksByRange` and `BlocksByRoot`
// protocols. For the time being, there is no reason why a peer should send
// this error.
// protocols.
//
// If we are syncing, there is no point keeping these peers around and
// continually failing to request blocks. We instantly ban them and hope that
// by the time the ban lifts, the peers will have completed their backfill
// sync.
//
// TODO: Potentially a more graceful way of handling such peers, would be to
// implement a new sync type which tracks these peers and prevents the sync
// algorithms from requesting blocks from them (at least for a set period of
// time, multiple failures would then lead to a ban).
PeerAction::Fatal
}
RPCResponseErrorCode::ServerError => PeerAction::MidToleranceError,

View File

@@ -211,16 +211,13 @@ mod tests {
let _ = snappy_buf.split_to(1);
// decode message just as snappy message
let snappy_decoded_message = snappy_outbound_codec.decode(&mut snappy_buf).unwrap();
let _snappy_decoded_message = snappy_outbound_codec.decode(&mut snappy_buf).unwrap();
// build codecs for entire chunk
let mut snappy_base_outbound_codec = BaseOutboundCodec::new(snappy_outbound_codec);
// decode message as ssz snappy chunk
let snappy_decoded_chunk = snappy_base_outbound_codec.decode(&mut buf).unwrap();
dbg!(snappy_decoded_message);
dbg!(snappy_decoded_chunk);
let _snappy_decoded_chunk = snappy_base_outbound_codec.decode(&mut buf).unwrap();
}
#[test]

View File

@@ -275,7 +275,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
) {
self.swarm
.behaviour_mut()
._send_error_reponse(peer_id, id, error, reason);
.send_error_reponse(peer_id, id, error, reason);
}
/// Report a peer's action.

View File

@@ -1,7 +1,7 @@
//! A collection of variables that are accessible outside of the network thread itself.
use crate::peer_manager::PeerDB;
use crate::rpc::MetaData;
use crate::types::SyncState;
use crate::types::{BackFillState, SyncState};
use crate::Client;
use crate::EnrExt;
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
@@ -29,6 +29,8 @@ pub struct NetworkGlobals<TSpec: EthSpec> {
pub gossipsub_subscriptions: RwLock<HashSet<GossipTopic>>,
/// The current sync status of the node.
pub sync_state: RwLock<SyncState>,
/// The current state of the backfill sync.
pub backfill_state: RwLock<BackFillState>,
}
impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
@@ -50,6 +52,7 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
peers: RwLock::new(PeerDB::new(trusted_peers, log)),
gossipsub_subscriptions: RwLock::new(HashSet::new()),
sync_state: RwLock::new(SyncState::Stalled),
backfill_state: RwLock::new(BackFillState::NotRequired),
}
}
@@ -104,6 +107,11 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
self.sync_state.read().clone()
}
/// Returns the current backfill state.
pub fn backfill_state(&self) -> BackFillState {
self.backfill_state.read().clone()
}
/// Returns a `Client` type if one is known for the `PeerId`.
pub fn client(&self, peer_id: &PeerId) -> Client {
self.peers

View File

@@ -15,5 +15,5 @@ pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
pub use globals::NetworkGlobals;
pub use pubsub::{PubsubMessage, SnappyTransform};
pub use subnet::{Subnet, SubnetDiscovery};
pub use sync_state::SyncState;
pub use sync_state::{BackFillState, SyncState};
pub use topics::{subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS};

View File

@@ -10,8 +10,13 @@ pub enum SyncState {
/// The node is performing a long-range (batch) sync over one or many head chains.
/// In this state parent lookups are disabled.
SyncingHead { start_slot: Slot, target_slot: Slot },
/// The node has identified the need for is sync operations and is transitioning to a syncing
/// state.
/// The node is undertaking a backfill sync. This occurs when a user has specified a trusted
/// state. The node first syncs "forward" by downloading blocks up to the current head as
/// specified by its peers. Once completed, the node enters this sync state and attempts to
/// download all required historical blocks to complete its chain.
BackFillSyncing { completed: usize, remaining: usize },
/// The node has completed syncing a finalized chain and is in the process of re-evaluating
/// which sync state to progress to.
SyncTransition,
/// The node is up to date with all known peers and is connected to at least one
/// fully synced peer. In this state, parent lookups are enabled.
@@ -21,6 +26,21 @@ pub enum SyncState {
Stalled,
}
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
/// The state of the backfill sync.
pub enum BackFillState {
/// The sync is partially completed and currently paused.
Paused,
/// We are currently backfilling.
Syncing,
/// A backfill sync has completed.
Completed,
/// A backfill sync is not required.
NotRequired,
/// Too many failed attempts at backfilling. Consider it failed.
Failed,
}
impl PartialEq for SyncState {
fn eq(&self, other: &Self) -> bool {
matches!(
@@ -32,6 +52,10 @@ impl PartialEq for SyncState {
| (SyncState::Synced, SyncState::Synced)
| (SyncState::Stalled, SyncState::Stalled)
| (SyncState::SyncTransition, SyncState::SyncTransition)
| (
SyncState::BackFillSyncing { .. },
SyncState::BackFillSyncing { .. }
)
)
}
}
@@ -43,14 +67,18 @@ impl SyncState {
SyncState::SyncingFinalized { .. } => true,
SyncState::SyncingHead { .. } => true,
SyncState::SyncTransition => true,
// Backfill doesn't effect any logic, we consider this state, not syncing.
SyncState::BackFillSyncing { .. } => false,
SyncState::Synced => false,
SyncState::Stalled => false,
}
}
/// Returns true if the node is synced.
///
/// NOTE: We consider the node synced if it is fetching old historical blocks.
pub fn is_synced(&self) -> bool {
matches!(self, SyncState::Synced)
matches!(self, SyncState::Synced | SyncState::BackFillSyncing { .. })
}
}
@@ -61,7 +89,8 @@ impl std::fmt::Display for SyncState {
SyncState::SyncingHead { .. } => write!(f, "Syncing Head Chain"),
SyncState::Synced { .. } => write!(f, "Synced"),
SyncState::Stalled { .. } => write!(f, "Stalled"),
SyncState::SyncTransition => write!(f, "Searching syncing peers"),
SyncState::SyncTransition => write!(f, "Evaluating known peers"),
SyncState::BackFillSyncing { .. } => write!(f, "Syncing Historical Blocks"),
}
}
}

View File

@@ -0,0 +1,33 @@
use beacon_chain::store::{metadata::CURRENT_SCHEMA_VERSION, AnchorInfo};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2::lighthouse::DatabaseInfo;
use std::sync::Arc;
use types::SignedBeaconBlock;
pub fn info<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
) -> Result<DatabaseInfo, warp::Rejection> {
let store = &chain.store;
let split = store.get_split_info();
let anchor = store.get_anchor_info();
Ok(DatabaseInfo {
schema_version: CURRENT_SCHEMA_VERSION.as_u64(),
split,
anchor,
})
}
pub fn historical_blocks<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
) -> Result<AnchorInfo, warp::Rejection> {
chain
.import_historical_block_batch(&blocks)
.map_err(warp_utils::reject::beacon_chain_error)?;
let anchor = chain.store.get_anchor_info().ok_or_else(|| {
warp_utils::reject::custom_bad_request("node is not checkpoint synced".to_string())
})?;
Ok(anchor)
}

View File

@@ -7,6 +7,7 @@
mod attester_duties;
mod block_id;
mod database;
mod metrics;
mod proposer_duties;
mod state_id;
@@ -349,7 +350,7 @@ pub fn serve<T: BeaconChainTypes>(
)))
}
}
SyncState::SyncingHead { .. } | SyncState::SyncTransition => Ok(()),
SyncState::SyncingHead { .. } | SyncState::SyncTransition | SyncState::BackFillSyncing { .. } => Ok(()),
SyncState::Synced => Ok(()),
SyncState::Stalled => Err(warp_utils::reject::not_synced(
"sync is stalled".to_string(),
@@ -1579,7 +1580,8 @@ pub fn serve<T: BeaconChainTypes>(
blocking_task(move || match *network_globals.sync_state.read() {
SyncState::SyncingFinalized { .. }
| SyncState::SyncingHead { .. }
| SyncState::SyncTransition => Ok(warp::reply::with_status(
| SyncState::SyncTransition
| SyncState::BackFillSyncing { .. } => Ok(warp::reply::with_status(
warp::reply(),
warp::http::StatusCode::PARTIAL_CONTENT,
)),
@@ -2040,7 +2042,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("validator"))
.and(warp::path("contribution_and_proofs"))
.and(warp::path::end())
.and(not_while_syncing_filter)
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and(warp::body::json())
.and(network_tx_filter.clone())
@@ -2399,6 +2401,49 @@ pub fn serve<T: BeaconChainTypes>(
})
});
let database_path = warp::path("lighthouse").and(warp::path("database"));
// GET lighthouse/database/info
let get_lighthouse_database_info = database_path
.and(warp::path("info"))
.and(warp::path::end())
.and(chain_filter.clone())
.and_then(|chain: Arc<BeaconChain<T>>| blocking_json_task(move || database::info(chain)));
// POST lighthouse/database/reconstruct
let post_lighthouse_database_reconstruct = database_path
.and(warp::path("reconstruct"))
.and(warp::path::end())
.and(not_while_syncing_filter)
.and(chain_filter.clone())
.and_then(|chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
chain.store_migrator.process_reconstruction();
Ok("success")
})
});
// POST lighthouse/database/historical_blocks
let post_lighthouse_database_historical_blocks = database_path
.and(warp::path("historical_blocks"))
.and(warp::path::end())
.and(warp::body::json())
.and(chain_filter.clone())
.and(log_filter.clone())
.and_then(
|blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
chain: Arc<BeaconChain<T>>,
log: Logger| {
info!(
log,
"Importing historical blocks";
"count" => blocks.len(),
"source" => "http_api"
);
blocking_json_task(move || database::historical_blocks(chain, blocks))
},
);
let get_events = eth1_v1
.and(warp::path("events"))
.and(warp::path::end())
@@ -2510,6 +2555,7 @@ pub fn serve<T: BeaconChainTypes>(
.or(get_lighthouse_eth1_deposit_cache.boxed())
.or(get_lighthouse_beacon_states_ssz.boxed())
.or(get_lighthouse_staking.boxed())
.or(get_lighthouse_database_info.boxed())
.or(get_events.boxed()),
)
.or(warp::post().and(
@@ -2526,7 +2572,9 @@ pub fn serve<T: BeaconChainTypes>(
.or(post_validator_contribution_and_proofs.boxed())
.or(post_validator_beacon_committee_subscriptions.boxed())
.or(post_validator_sync_committee_subscriptions.boxed())
.or(post_lighthouse_liveness.boxed()),
.or(post_lighthouse_liveness.boxed())
.or(post_lighthouse_database_reconstruct.boxed())
.or(post_lighthouse_database_historical_blocks.boxed()),
))
.recover(warp_utils::reject::handle_rejection)
.with(slog_logging(log.clone()))

View File

@@ -2101,6 +2101,29 @@ impl ApiTester {
self
}
pub async fn test_get_lighthouse_database_info(self) -> Self {
let info = self.client.get_lighthouse_database_info().await.unwrap();
assert_eq!(info.anchor, self.chain.store.get_anchor_info());
assert_eq!(info.split, self.chain.store.get_split_info());
assert_eq!(
info.schema_version,
store::metadata::CURRENT_SCHEMA_VERSION.as_u64()
);
self
}
pub async fn test_post_lighthouse_database_reconstruct(self) -> Self {
let response = self
.client
.post_lighthouse_database_reconstruct()
.await
.unwrap();
assert_eq!(response, "success");
self
}
pub async fn test_post_lighthouse_liveness(self) -> Self {
let epoch = self.chain.epoch().unwrap();
let head_state = self.chain.head_beacon_state().unwrap();
@@ -2653,6 +2676,10 @@ async fn lighthouse_endpoints() {
.await
.test_get_lighthouse_staking()
.await
.test_get_lighthouse_database_info()
.await
.test_post_lighthouse_database_reconstruct()
.await
.test_post_lighthouse_liveness()
.await;
}

View File

@@ -2,7 +2,7 @@ use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE;
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped};
use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, WhenSlotSkipped};
use eth2_libp2p::rpc::StatusMessage;
use eth2_libp2p::rpc::*;
use eth2_libp2p::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
@@ -38,6 +38,21 @@ impl<T: BeaconChainTypes> Worker<T> {
})
}
pub fn send_error_response(
&self,
peer_id: PeerId,
error: RPCResponseErrorCode,
reason: String,
id: PeerRequestId,
) {
self.send_network_message(NetworkMessage::SendErrorResponse {
peer_id,
error,
reason,
id,
})
}
/* Processing functions */
/// Process a `Status` message to determine if a peer is relevant to us. If the peer is
@@ -163,6 +178,20 @@ impl<T: BeaconChainTypes> Worker<T> {
.forwards_iter_block_roots(Slot::from(req.start_slot))
{
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockError(
HistoricalBlockError::BlockOutOfRange {
slot,
oldest_block_slot,
},
)) => {
debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot);
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Backfilling".into(),
request_id,
);
}
Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e),
};

View File

@@ -2,9 +2,11 @@ use super::{super::work_reprocessing_queue::ReprocessQueueMessage, Worker};
use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE;
use crate::beacon_processor::BlockResultSender;
use crate::metrics;
use crate::sync::manager::SyncMessage;
use crate::sync::manager::{SyncMessage, SyncRequestType};
use crate::sync::{BatchProcessResult, ChainId};
use beacon_chain::{BeaconChainTypes, BlockError, ChainSegmentResult};
use beacon_chain::{
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError,
};
use eth2_libp2p::PeerId;
use slog::{crit, debug, error, info, trace, warn};
use tokio::sync::mpsc;
@@ -15,6 +17,8 @@ use types::{Epoch, Hash256, SignedBeaconBlock};
pub enum ProcessId {
/// Processing Id of a range syncing batch.
RangeBatchId(ChainId, Epoch),
/// Processing ID for a backfill syncing batch.
BackSyncBatchId(Epoch),
/// Processing Id of the parent lookup of a block.
ParentLookup(PeerId, Hash256),
}
@@ -99,11 +103,40 @@ impl<T: BeaconChainTypes> Worker<T> {
}
};
self.send_sync_message(SyncMessage::BatchProcessed {
chain_id,
epoch,
result,
});
let sync_type = SyncRequestType::RangeSync(epoch, chain_id);
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
}
// this a request from the Backfill sync
ProcessId::BackSyncBatchId(epoch) => {
let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64());
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
let sent_blocks = downloaded_blocks.len();
let result = match self.process_backfill_blocks(&downloaded_blocks) {
(_, Ok(_)) => {
debug!(self.log, "Backfill batch processed";
"batch_epoch" => epoch,
"first_block_slot" => start_slot,
"last_block_slot" => end_slot,
"processed_blocks" => sent_blocks,
"service"=> "sync");
BatchProcessResult::Success(sent_blocks > 0)
}
(_, Err(e)) => {
debug!(self.log, "Backfill batch processing failed";
"batch_epoch" => epoch,
"first_block_slot" => start_slot,
"last_block_slot" => end_slot,
"error" => e,
"service" => "sync");
BatchProcessResult::Failed(false)
}
};
let sync_type = SyncRequestType::BackFillSync(epoch);
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
}
// this is a parent lookup request from the sync manager
ProcessId::ParentLookup(peer_id, chain_head) => {
@@ -160,6 +193,80 @@ impl<T: BeaconChainTypes> Worker<T> {
}
}
/// Helper function to process backfill block batches which only consumes the chain and blocks to process.
fn process_backfill_blocks(
&self,
blocks: &[SignedBeaconBlock<T::EthSpec>],
) -> (usize, Result<(), String>) {
match self.chain.import_historical_block_batch(blocks) {
Ok(imported_blocks) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_SUCCESS_TOTAL,
);
(imported_blocks, Ok(()))
}
Err(error) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_FAILED_TOTAL,
);
let err = match error {
// Handle the historical block errors specifically
BeaconChainError::HistoricalBlockError(e) => match e {
HistoricalBlockError::MismatchedBlockRoot {
block_root,
expected_block_root,
} => {
debug!(
self.log,
"Backfill batch processing error";
"error" => "mismatched_block_root",
"block_root" => ?block_root,
"expected_root" => ?expected_block_root
);
String::from("mismatched_block_root")
}
HistoricalBlockError::InvalidSignature
| HistoricalBlockError::SignatureSet(_) => {
warn!(
self.log,
"Backfill batch processing error";
"error" => ?e
);
"invalid_signature".into()
}
HistoricalBlockError::ValidatorPubkeyCacheTimeout => {
warn!(
self.log,
"Backfill batch processing error";
"error" => "pubkey_cache_timeout"
);
"pubkey_cache_timeout".into()
}
HistoricalBlockError::NoAnchorInfo => {
warn!(self.log, "Backfill not required");
String::from("no_anchor_info")
}
HistoricalBlockError::IndexOutOfBounds
| HistoricalBlockError::BlockOutOfRange { .. } => {
error!(
self.log,
"Backfill batch processing error";
"error" => ?e,
);
String::from("logic_error")
}
},
other => {
warn!(self.log, "Backfill batch processing error"; "error" => ?other);
format!("{:?}", other)
}
};
(0, Err(err))
}
}
}
/// Runs fork-choice on a given chain. This is used during block processing after one successful
/// block import.
fn run_fork_choice(&self) {

View File

@@ -338,10 +338,18 @@ lazy_static! {
"beacon_processor_chain_segment_success_total",
"Total number of chain segments successfully processed."
);
pub static ref BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_SUCCESS_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_backfill_chain_segment_success_total",
"Total number of chain segments successfully processed."
);
pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_chain_segment_failed_total",
"Total number of chain segments that failed processing."
);
pub static ref BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_FAILED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_backfill_chain_segment_failed_total",
"Total number of backfill chain segments that failed processing."
);
// Unaggregated attestations.
pub static ref BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_unaggregated_attestation_queue_total",

View File

@@ -418,7 +418,7 @@ impl<T: EthSpec> HandlerNetworkContext<T> {
error: RPCResponseErrorCode,
reason: String,
) {
self.inform_network(NetworkMessage::SendError {
self.inform_network(NetworkMessage::SendErrorResponse {
peer_id,
error,
id,

View File

@@ -63,10 +63,8 @@ pub enum NetworkMessage<T: EthSpec> {
response: Response<T>,
id: PeerRequestId,
},
/// Respond to a peer's request with an error.
SendError {
// NOTE: Currently this is never used, we just say goodbye without nicely closing the
// stream assigned to the request
/// Sends an error response to an RPC request.
SendErrorResponse {
peer_id: PeerId,
error: RPCResponseErrorCode,
reason: String,
@@ -386,7 +384,7 @@ fn spawn_service<T: BeaconChainTypes>(
NetworkMessage::SendResponse{ peer_id, response, id } => {
service.libp2p.send_response(peer_id, id, response);
}
NetworkMessage::SendError{ peer_id, error, id, reason } => {
NetworkMessage::SendErrorResponse{ peer_id, error, id, reason } => {
service.libp2p.respond_with_error(peer_id, id, error, reason);
}
NetworkMessage::UPnPMappingEstablished { tcp_socket, udp_socket} => {

File diff suppressed because it is too large Load Diff

View File

@@ -33,6 +33,7 @@
//! needs to be searched for (i.e if an attestation references an unknown block) this manager can
//! search for the block and subsequently search for parents if needed.
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::network_context::SyncNetworkContext;
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{ChainId, RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
@@ -77,14 +78,14 @@ pub enum SyncMessage<T: EthSpec> {
/// A useful peer has been discovered.
AddPeer(PeerId, SyncInfo),
/// A `BlocksByRange` response has been received.
/// A [`BlocksByRange`] response has been received.
BlocksByRangeResponse {
peer_id: PeerId,
request_id: RequestId,
beacon_block: Option<Box<SignedBeaconBlock<T>>>,
},
/// A `BlocksByRoot` response has been received.
/// A [`BlocksByRoot`] response has been received.
BlocksByRootResponse {
peer_id: PeerId,
request_id: RequestId,
@@ -106,8 +107,7 @@ pub enum SyncMessage<T: EthSpec> {
/// A batch has been processed by the block processor thread.
BatchProcessed {
chain_id: ChainId,
epoch: Epoch,
sync_type: SyncRequestType,
result: BatchProcessResult,
},
@@ -120,6 +120,15 @@ pub enum SyncMessage<T: EthSpec> {
},
}
/// The type of sync request made
#[derive(Debug, Clone)]
pub enum SyncRequestType {
/// Request was from the backfill sync algorithm.
BackFillSync(Epoch),
/// The request was from a chain in the range sync algorithm.
RangeSync(Epoch, ChainId),
}
/// The result of processing a multiple blocks (a chain segment).
#[derive(Debug)]
pub enum BatchProcessResult {
@@ -166,6 +175,9 @@ pub struct SyncManager<T: BeaconChainTypes> {
/// The object handling long-range batch load-balanced syncing.
range_sync: RangeSync<T>,
/// Backfill syncing.
backfill_sync: BackFillSync<T>,
/// A collection of parent block lookups.
parent_queue: SmallVec<[ParentRequests<T::EthSpec>; 3]>,
@@ -227,6 +239,12 @@ pub fn spawn<T: BeaconChainTypes>(
beacon_processor_send.clone(),
log.clone(),
),
backfill_sync: BackFillSync::new(
beacon_chain.clone(),
network_globals.clone(),
beacon_processor_send.clone(),
log.clone(),
),
network: SyncNetworkContext::new(network_send, network_globals.clone(), log.clone()),
chain: beacon_chain,
network_globals,
@@ -576,6 +594,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
/// Handles RPC errors related to requests that were emitted from the sync manager.
fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId) {
trace!(self.log, "Sync manager received a failed RPC");
// remove any single block lookups
@@ -597,14 +616,16 @@ impl<T: BeaconChainTypes> SyncManager<T> {
return;
}
// otherwise, this is a range sync issue, notify the range sync
self.range_sync
.inject_error(&mut self.network, peer_id, request_id);
self.update_sync_state();
// Otherwise this error matches no known request.
trace!(self.log, "Response/Error for non registered request"; "request_id" => request_id)
}
fn peer_disconnect(&mut self, peer_id: &PeerId) {
self.range_sync.peer_disconnect(&mut self.network, peer_id);
// Regardless of the outcome, we update the sync status.
let _ = self
.backfill_sync
.peer_disconnected(peer_id, &mut self.network);
self.update_sync_state();
}
@@ -624,12 +645,18 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let new_state = sync_type.as_sync_status(remote_sync_info);
let rpr = new_state.as_str();
let was_updated = peer_info.sync_status.update(new_state);
let was_updated = peer_info.sync_status.update(new_state.clone());
if was_updated {
debug!(self.log, "Peer transitioned sync state"; "peer_id" => %peer_id, "new_state" => rpr,
"our_head_slot" => local_sync_info.head_slot, "out_finalized_epoch" => local_sync_info.finalized_epoch,
"their_head_slot" => remote_sync_info.head_slot, "their_finalized_epoch" => remote_sync_info.finalized_epoch,
"is_connected" => peer_info.is_connected());
// A peer has transitioned its sync state. If the new state is "synced" we
// inform the backfill sync that a new synced peer has joined us.
if new_state.is_synced() {
self.backfill_sync.fully_synced_peer_joined();
}
}
peer_info.is_connected()
} else {
@@ -638,7 +665,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
/// Updates the global sync state and logs any changes.
/// Updates the global sync state, optionally instigating or pausing a backfill sync as well as
/// logging any changes.
///
/// The logic for which sync should be running is as follows:
/// - If there is a range-sync running (or required) pause any backfill and let range-sync
/// complete.
/// - If there is no current range sync, check for any requirement to backfill and either
/// start/resume a backfill sync if required. The global state will be BackFillSync if a
/// backfill sync is running.
/// - If there is no range sync and no required backfill and we have synced up to the currently
/// known peers, we consider ourselves synced.
fn update_sync_state(&mut self) {
let new_state: SyncState = match self.range_sync.state() {
Err(e) => {
@@ -647,41 +684,75 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
Ok(state) => match state {
None => {
// no range sync, decide if we are stalled or synced.
// No range sync, so we decide if we are stalled or synced.
// For this we check if there is at least one advanced peer. An advanced peer
// with Idle range is possible since a peer's status is updated periodically.
// If we synced a peer between status messages, most likely the peer has
// advanced and will produce a head chain on re-status. Otherwise it will shift
// to being synced
let head = self.chain.best_slot().unwrap_or_else(|_| Slot::new(0));
let current_slot = self.chain.slot().unwrap_or_else(|_| Slot::new(0));
let mut sync_state = {
let head = self.chain.best_slot().unwrap_or_else(|_| Slot::new(0));
let current_slot = self.chain.slot().unwrap_or_else(|_| Slot::new(0));
let peers = self.network_globals.peers.read();
if current_slot >= head
&& current_slot.sub(head) <= (SLOT_IMPORT_TOLERANCE as u64)
&& head > 0
{
SyncState::Synced
} else if peers.advanced_peers().next().is_some() {
SyncState::SyncTransition
} else if peers.synced_peers().next().is_none() {
SyncState::Stalled
} else {
// There are no peers that require syncing and we have at least one synced
// peer
SyncState::Synced
let peers = self.network_globals.peers.read();
if current_slot >= head
&& current_slot.sub(head) <= (SLOT_IMPORT_TOLERANCE as u64)
&& head > 0
{
SyncState::Synced
} else if peers.advanced_peers().next().is_some() {
SyncState::SyncTransition
} else if peers.synced_peers().next().is_none() {
SyncState::Stalled
} else {
// There are no peers that require syncing and we have at least one synced
// peer
SyncState::Synced
}
};
// If we would otherwise be synced, first check if we need to perform or
// complete a backfill sync.
if matches!(sync_state, SyncState::Synced) {
// Determine if we need to start/resume/restart a backfill sync.
match self.backfill_sync.start(&mut self.network) {
Ok(SyncStart::Syncing {
completed,
remaining,
}) => {
sync_state = SyncState::BackFillSyncing {
completed,
remaining,
};
}
Ok(SyncStart::NotSyncing) => {} // Ignore updating the state if the backfill sync state didn't start.
Err(e) => {
error!(self.log, "Backfill sync failed to start"; "error" => ?e);
}
}
}
// Return the sync state if backfilling is not required.
sync_state
}
Some((RangeSyncType::Finalized, start_slot, target_slot)) => {
// If there is a backfill sync in progress pause it.
self.backfill_sync.pause();
SyncState::SyncingFinalized {
start_slot,
target_slot,
}
}
Some((RangeSyncType::Head, start_slot, target_slot)) => SyncState::SyncingHead {
start_slot,
target_slot,
},
Some((RangeSyncType::Head, start_slot, target_slot)) => {
// If there is a backfill sync in progress pause it.
self.backfill_sync.pause();
SyncState::SyncingHead {
start_slot,
target_slot,
}
}
},
};
@@ -690,7 +761,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if !new_state.eq(&old_state) {
info!(self.log, "Sync state updated"; "old_state" => %old_state, "new_state" => %new_state);
// If we have become synced - Subscribe to all the core subnet topics
if new_state.is_synced() {
// We don't need to subscribe if the old state is a state that would have already
// invoked this call.
if new_state.is_synced()
&& !matches!(
old_state,
SyncState::Synced { .. } | SyncState::BackFillSyncing { .. }
)
{
self.network.subscribe_core_topics();
}
}
@@ -828,14 +906,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// peer. We don't consider this chain a failure and prevent retries with another
// peer.
"too many failed attempts"
} else {
if !parent_request.downloaded_blocks.is_empty() {
self.failed_chains
.insert(parent_request.downloaded_blocks[0].canonical_root());
} else {
crit!(self.log, "Parent lookup has no blocks");
}
} else if !parent_request.downloaded_blocks.is_empty() {
self.failed_chains
.insert(parent_request.downloaded_blocks[0].canonical_root());
"reached maximum lookup-depth"
} else {
crit!(self.log, "Parent lookup has no blocks");
"no blocks"
};
debug!(self.log, "Parent import failed";
@@ -887,13 +964,44 @@ impl<T: BeaconChainTypes> SyncManager<T> {
request_id,
beacon_block,
} => {
self.range_sync.blocks_by_range_response(
&mut self.network,
peer_id,
request_id,
beacon_block.map(|b| *b),
);
self.update_sync_state();
let beacon_block = beacon_block.map(|b| *b);
// Obtain which sync requested these blocks and divert accordingly.
match self
.network
.blocks_by_range_response(request_id, beacon_block.is_none())
{
Some(SyncRequestType::RangeSync(batch_id, chain_id)) => {
self.range_sync.blocks_by_range_response(
&mut self.network,
peer_id,
chain_id,
batch_id,
request_id,
beacon_block,
);
self.update_sync_state();
}
Some(SyncRequestType::BackFillSync(batch_id)) => {
match self.backfill_sync.on_block_response(
&mut self.network,
batch_id,
&peer_id,
request_id,
beacon_block,
) {
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
Ok(ProcessResult::Successful) => {}
Err(_error) => {
// The backfill sync has failed, errors are reported
// within.
self.update_sync_state();
}
}
}
None => {
trace!(self.log, "Response/Error for non registered request"; "request_id" => request_id)
}
}
}
SyncMessage::BlocksByRootResponse {
peer_id,
@@ -913,21 +1021,63 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.peer_disconnect(&peer_id);
}
SyncMessage::RPCError(peer_id, request_id) => {
self.inject_error(peer_id, request_id);
}
SyncMessage::BatchProcessed {
chain_id,
epoch,
result,
} => {
self.range_sync.handle_block_process_result(
&mut self.network,
chain_id,
epoch,
result,
);
self.update_sync_state();
// Redirect to a sync mechanism if the error is related to one of their
// requests.
match self.network.blocks_by_range_response(request_id, true) {
Some(SyncRequestType::RangeSync(batch_id, chain_id)) => {
self.range_sync.inject_error(
&mut self.network,
peer_id,
batch_id,
chain_id,
request_id,
);
self.update_sync_state();
}
Some(SyncRequestType::BackFillSync(batch_id)) => {
match self.backfill_sync.inject_error(
&mut self.network,
batch_id,
&peer_id,
request_id,
) {
Ok(_) => {}
Err(_) => self.update_sync_state(),
}
}
None => {
// This is a request not belonging to a sync algorithm.
// Process internally.
self.inject_error(peer_id, request_id);
}
}
}
SyncMessage::BatchProcessed { sync_type, result } => match sync_type {
SyncRequestType::RangeSync(epoch, chain_id) => {
self.range_sync.handle_block_process_result(
&mut self.network,
chain_id,
epoch,
result,
);
self.update_sync_state();
}
SyncRequestType::BackFillSync(epoch) => {
match self.backfill_sync.on_batch_process_result(
&mut self.network,
epoch,
&result,
) {
Ok(ProcessResult::Successful) => {}
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
Err(error) => {
error!(self.log, "Backfill sync failed"; "error" => ?error);
// Update the global status
self.update_sync_state();
}
}
}
},
SyncMessage::ParentLookupFailed {
chain_head,
peer_id,

View File

@@ -1,6 +1,7 @@
//! Syncing for lighthouse.
//!
//! Stores the various syncing methods for the beacon chain.
mod backfill_sync;
pub mod manager;
mod network_context;
mod peer_sync_info;

View File

@@ -1,6 +1,7 @@
//! Provides network functionality for the Syncing thread. This fundamentally wraps a network
//! channel and stores a global RPC ID to perform requests.
use super::manager::SyncRequestType;
use super::range_sync::{BatchId, ChainId};
use super::RequestId as SyncRequestId;
use crate::service::NetworkMessage;
@@ -26,8 +27,8 @@ pub struct SyncNetworkContext<T: EthSpec> {
/// A sequential ID for all RPC requests.
request_id: SyncRequestId,
/// BlocksByRange requests made by range syncing chains.
range_requests: FnvHashMap<SyncRequestId, (ChainId, BatchId)>,
/// BlocksByRange requests made by syncing algorithms.
range_requests: FnvHashMap<SyncRequestId, SyncRequestType>,
/// Logger for the `SyncNetworkContext`.
log: slog::Logger,
@@ -81,6 +82,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
}
}
/// A blocks by range request for the range sync algorithm.
pub fn blocks_by_range_request(
&mut self,
peer_id: PeerId,
@@ -96,15 +98,37 @@ impl<T: EthSpec> SyncNetworkContext<T> {
"peer" => %peer_id,
);
let req_id = self.send_rpc_request(peer_id, Request::BlocksByRange(request))?;
self.range_requests.insert(req_id, (chain_id, batch_id));
self.range_requests
.insert(req_id, SyncRequestType::RangeSync(batch_id, chain_id));
Ok(req_id)
}
/// A blocks by range request sent by the backfill sync algorithm
pub fn backfill_blocks_by_range_request(
&mut self,
peer_id: PeerId,
request: BlocksByRangeRequest,
batch_id: BatchId,
) -> Result<SyncRequestId, &'static str> {
trace!(
self.log,
"Sending backfill BlocksByRange Request";
"method" => "BlocksByRange",
"count" => request.count,
"peer" => %peer_id,
);
let req_id = self.send_rpc_request(peer_id, Request::BlocksByRange(request))?;
self.range_requests
.insert(req_id, SyncRequestType::BackFillSync(batch_id));
Ok(req_id)
}
/// Received a blocks by range response.
pub fn blocks_by_range_response(
&mut self,
request_id: usize,
remove: bool,
) -> Option<(ChainId, BatchId)> {
) -> Option<SyncRequestType> {
// NOTE: we can't guarantee that the request must be registered as it could receive more
// than an error, and be removed after receiving the first one.
// FIXME: https://github.com/sigp/lighthouse/issues/1634
@@ -115,6 +139,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
}
}
/// Sends a blocks by root request.
pub fn blocks_by_root_request(
&mut self,
peer_id: PeerId,
@@ -130,6 +155,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
self.send_rpc_request(peer_id, Request::BlocksByRoot(request))
}
/// Terminates the connection with the peer and bans them.
pub fn goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
self.network_send
.send(NetworkMessage::GoodbyePeer {
@@ -142,6 +168,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
});
}
/// Reports to the scoring algorithm the behaviour of a peer.
pub fn report_peer(&mut self, peer_id: PeerId, action: PeerAction) {
debug!(self.log, "Sync reporting peer"; "peer_id" => %peer_id, "action" => %action);
self.network_send
@@ -155,7 +182,8 @@ impl<T: EthSpec> SyncNetworkContext<T> {
});
}
pub fn send_rpc_request(
/// Sends an RPC request.
fn send_rpc_request(
&mut self,
peer_id: PeerId,
request: Request,
@@ -170,6 +198,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
Ok(request_id)
}
/// Subscribes to core topics.
pub fn subscribe_core_topics(&mut self) {
self.network_send
.send(NetworkMessage::SubscribeCoreTopics)
@@ -178,6 +207,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
});
}
/// Sends an arbitrary network message.
fn send_network_msg(&mut self, msg: NetworkMessage<T>) -> Result<(), &'static str> {
self.network_send.send(msg).map_err(|_| {
debug!(self.log, "Could not send message to the network service");

View File

@@ -14,15 +14,34 @@ const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5;
/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty.
const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3;
/// Allows customisation of the above constants used in other sync methods such as BackFillSync.
pub trait BatchConfig {
/// The maximum batch download attempts.
fn max_batch_download_attempts() -> u8;
/// The max batch processing attempts.
fn max_batch_processing_attempts() -> u8;
}
pub struct RangeSyncBatchConfig {}
impl BatchConfig for RangeSyncBatchConfig {
fn max_batch_download_attempts() -> u8 {
MAX_BATCH_DOWNLOAD_ATTEMPTS
}
fn max_batch_processing_attempts() -> u8 {
MAX_BATCH_PROCESSING_ATTEMPTS
}
}
/// Error type of a batch in a wrong state.
// Such errors should never be encountered.
pub struct WrongState(pub(super) String);
pub struct WrongState(pub(crate) String);
/// Auxiliary type alias for readability.
type IsFailed = bool;
/// A segment of a chain.
pub struct BatchInfo<T: EthSpec> {
pub struct BatchInfo<T: EthSpec, B: BatchConfig = RangeSyncBatchConfig> {
/// Start slot of the batch.
start_slot: Slot,
/// End slot of the batch.
@@ -33,6 +52,8 @@ pub struct BatchInfo<T: EthSpec> {
failed_download_attempts: Vec<PeerId>,
/// State of the batch.
state: BatchState<T>,
/// Pin the generic
marker: std::marker::PhantomData<B>,
}
/// Current state of a batch
@@ -73,7 +94,7 @@ impl<T: EthSpec> BatchState<T> {
}
}
impl<T: EthSpec> BatchInfo<T> {
impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
/// Batches are downloaded excluding the first block of the epoch assuming it has already been
/// downloaded.
///
@@ -91,6 +112,7 @@ impl<T: EthSpec> BatchInfo<T> {
failed_processing_attempts: Vec::new(),
failed_download_attempts: Vec::new(),
state: BatchState::AwaitingDownload,
marker: std::marker::PhantomData,
}
}
@@ -120,6 +142,7 @@ impl<T: EthSpec> BatchInfo<T> {
false
}
/// Returns the peer that is currently responsible for progressing the state of the batch.
pub fn current_peer(&self) -> Option<&PeerId> {
match &self.state {
BatchState::AwaitingDownload | BatchState::Failed => None,
@@ -131,6 +154,7 @@ impl<T: EthSpec> BatchInfo<T> {
}
}
/// Returns a BlocksByRange request associated with the batch.
pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest {
BlocksByRangeRequest {
start_slot: self.start_slot.into(),
@@ -192,7 +216,7 @@ impl<T: EthSpec> BatchInfo<T> {
// can be tried again
self.failed_download_attempts.push(peer);
self.state = if self.failed_download_attempts.len()
>= MAX_BATCH_DOWNLOAD_ATTEMPTS as usize
>= B::max_batch_download_attempts() as usize
{
BatchState::Failed
} else {
@@ -219,14 +243,21 @@ impl<T: EthSpec> BatchInfo<T> {
}
}
/// Mark the batch as failed and return whether we can attempt a re-download.
///
/// This can happen if a peer disconnects or some error occurred that was not the peers fault.
/// THe `mark_failed` parameter, when set to false, does not increment the failed attempts of
/// this batch and register the peer, rather attempts a re-download.
#[must_use = "Batch may have failed"]
pub fn download_failed(&mut self) -> Result<IsFailed, WrongState> {
pub fn download_failed(&mut self, mark_failed: bool) -> Result<IsFailed, WrongState> {
match self.state.poison() {
BatchState::Downloading(peer, _, _request_id) => {
// register the attempt and check if the batch can be tried again
self.failed_download_attempts.push(peer);
if mark_failed {
self.failed_download_attempts.push(peer);
}
self.state = if self.failed_download_attempts.len()
>= MAX_BATCH_DOWNLOAD_ATTEMPTS as usize
>= B::max_batch_download_attempts as usize
{
BatchState::Failed
} else {
@@ -294,7 +325,7 @@ impl<T: EthSpec> BatchInfo<T> {
// check if the batch can be downloaded again
if self.failed_processing_attempts.len()
>= MAX_BATCH_PROCESSING_ATTEMPTS as usize
>= B::max_batch_processing_attempts() as usize
{
BatchState::Failed
} else {
@@ -324,7 +355,7 @@ impl<T: EthSpec> BatchInfo<T> {
// check if the batch can be downloaded again
self.state = if self.failed_processing_attempts.len()
>= MAX_BATCH_PROCESSING_ATTEMPTS as usize
>= B::max_batch_processing_attempts() as usize
{
BatchState::Failed
} else {
@@ -365,7 +396,7 @@ impl Attempt {
}
}
impl<T: EthSpec> slog::KV for &mut BatchInfo<T> {
impl<T: EthSpec, B: BatchConfig> slog::KV for &mut BatchInfo<T, B> {
fn serialize(
&self,
record: &slog::Record,
@@ -375,7 +406,7 @@ impl<T: EthSpec> slog::KV for &mut BatchInfo<T> {
}
}
impl<T: EthSpec> slog::KV for BatchInfo<T> {
impl<T: EthSpec, B: BatchConfig> slog::KV for BatchInfo<T, B> {
fn serialize(
&self,
record: &slog::Record,

View File

@@ -181,7 +181,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// fail the batches
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
if batch.download_failed()? {
if batch.download_failed(true)? {
return Err(RemoveChain::ChainFailed(id));
}
self.retry_batch_download(network, id)?;
@@ -273,7 +273,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
}
/// Sends to process the batch with the given id.
/// Processes the batch with the given id.
/// The batch must exist and be ready for processing
fn process_batch(
&mut self,
@@ -794,7 +794,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
if let Some(active_requests) = self.peers.get_mut(peer_id) {
active_requests.remove(&batch_id);
}
if batch.download_failed()? {
if batch.download_failed(true)? {
return Err(RemoveChain::ChainFailed(batch_id));
}
self.retry_batch_download(network, batch_id)
@@ -837,7 +837,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
}
/// Requests the batch asigned to the given id from a given peer.
/// Requests the batch assigned to the given id from a given peer.
pub fn send_batch(
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
@@ -883,7 +883,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.peers
.get_mut(&peer)
.map(|request| request.remove(&batch_id));
if batch.download_failed()? {
if batch.download_failed(true)? {
return Err(RemoveChain::ChainFailed(batch_id));
} else {
return self.retry_batch_download(network, batch_id);
@@ -990,7 +990,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// this batch could have been included already being an optimistic batch
match self.batches.entry(batch_id) {
Entry::Occupied(_) => {
// this batch doesn't need downlading, let this same function decide the next batch
// this batch doesn't need downloading, let this same function decide the next batch
self.to_be_downloaded += EPOCHS_PER_BATCH;
self.include_next_batch()
}

View File

@@ -7,7 +7,7 @@ mod chain_collection;
mod range;
mod sync_type;
pub use batch::BatchInfo;
pub use batch::{BatchConfig, BatchInfo, BatchState};
pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH};
pub use range::RangeSync;
pub use sync_type::RangeSyncType;

View File

@@ -39,7 +39,7 @@
//! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially
//! and further batches are requested as current blocks are being processed.
use super::chain::{ChainId, RemoveChain, SyncingChain};
use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain};
use super::chain_collection::ChainCollection;
use super::sync_type::RangeSyncType;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
@@ -194,34 +194,29 @@ impl<T: BeaconChainTypes> RangeSync<T> {
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
peer_id: PeerId,
chain_id: ChainId,
batch_id: BatchId,
request_id: RequestId,
beacon_block: Option<SignedBeaconBlock<T::EthSpec>>,
) {
// get the chain and batch for which this response belongs
if let Some((chain_id, batch_id)) =
network.blocks_by_range_response(request_id, beacon_block.is_none())
{
// check if this chunk removes the chain
match self.chains.call_by_id(chain_id, |chain| {
chain.on_block_response(network, batch_id, &peer_id, request_id, beacon_block)
}) {
Ok((removed_chain, sync_type)) => {
if let Some((removed_chain, remove_reason)) = removed_chain {
self.on_chain_removed(
removed_chain,
sync_type,
remove_reason,
network,
"block response",
);
}
}
Err(_) => {
trace!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id)
// check if this chunk removes the chain
match self.chains.call_by_id(chain_id, |chain| {
chain.on_block_response(network, batch_id, &peer_id, request_id, beacon_block)
}) {
Ok((removed_chain, sync_type)) => {
if let Some((removed_chain, remove_reason)) = removed_chain {
self.on_chain_removed(
removed_chain,
sync_type,
remove_reason,
network,
"block response",
);
}
}
} else {
trace!(self.log, "Response/Error for non registered request"; "request_id" => request_id)
Err(_) => {
trace!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id)
}
}
}
@@ -298,31 +293,28 @@ impl<T: BeaconChainTypes> RangeSync<T> {
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
peer_id: PeerId,
batch_id: BatchId,
chain_id: ChainId,
request_id: RequestId,
) {
// get the chain and batch for which this response belongs
if let Some((chain_id, batch_id)) = network.blocks_by_range_response(request_id, true) {
// check that this request is pending
match self.chains.call_by_id(chain_id, |chain| {
chain.inject_error(network, batch_id, &peer_id, request_id)
}) {
Ok((removed_chain, sync_type)) => {
if let Some((removed_chain, remove_reason)) = removed_chain {
self.on_chain_removed(
removed_chain,
sync_type,
remove_reason,
network,
"RPC error",
);
}
}
Err(_) => {
trace!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id)
// check that this request is pending
match self.chains.call_by_id(chain_id, |chain| {
chain.inject_error(network, batch_id, &peer_id, request_id)
}) {
Ok((removed_chain, sync_type)) => {
if let Some((removed_chain, remove_reason)) = removed_chain {
self.on_chain_removed(
removed_chain,
sync_type,
remove_reason,
network,
"RPC error",
);
}
}
} else {
trace!(self.log, "Response/Error for non registered request"; "request_id" => request_id)
Err(_) => {
trace!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id)
}
}
}

View File

@@ -54,7 +54,8 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.arg(
Arg::with_name("shutdown-after-sync")
.long("shutdown-after-sync")
.help("Shutdown beacon node as soon as sync is completed")
.help("Shutdown beacon node as soon as sync is completed. Backfill sync will \
not be performed before shutdown.")
.takes_value(false),
)
.arg(
@@ -479,12 +480,46 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
Arg::with_name("wss-checkpoint")
.long("wss-checkpoint")
.help(
"Used to input a Weak Subjectivity State Checkpoint in `block_root:epoch_number` format,\
where block_root is an '0x' prefixed 32-byte hex string and epoch_number is an integer."
"Specify a weak subjectivity checkpoint in `block_root:epoch` format to verify \
the node's sync against. The block root should be 0x-prefixed. Note that this \
flag is for verification only, to perform a checkpoint sync from a recent \
state use --checkpoint-sync-url."
)
.value_name("WSS_CHECKPOINT")
.takes_value(true)
)
.arg(
Arg::with_name("checkpoint-state")
.long("checkpoint-state")
.help("Set a checkpoint state to start syncing from. Must be aligned and match \
--checkpoint-block. Using --checkpoint-sync-url instead is recommended.")
.value_name("STATE_SSZ")
.takes_value(true)
.requires("checkpoint-block")
)
.arg(
Arg::with_name("checkpoint-block")
.long("checkpoint-block")
.help("Set a checkpoint block to start syncing from. Must be aligned and match \
--checkpoint-state. Using --checkpoint-sync-url instead is recommended.")
.value_name("BLOCK_SSZ")
.takes_value(true)
.requires("checkpoint-state")
)
.arg(
Arg::with_name("checkpoint-sync-url")
.long("checkpoint-sync-url")
.help("Set the remote beacon node HTTP endpoint to use for checkpoint sync.")
.value_name("BEACON_NODE")
.takes_value(true)
.conflicts_with("checkpoint-state")
)
.arg(
Arg::with_name("reconstruct-historic-states")
.long("reconstruct-historic-states")
.help("After a checkpoint sync, reconstruct historic states in the database.")
.takes_value(false)
)
.arg(
Arg::with_name("validator-monitor-auto")
.long("validator-monitor-auto")

View File

@@ -292,15 +292,62 @@ pub fn get_config<E: EthSpec>(
}
}
if let Some(genesis_state_bytes) = eth2_network_config.genesis_state_bytes {
// Note: re-serializing the genesis state is not so efficient, however it avoids adding
// trait bounds to the `ClientGenesis` enum. This would have significant flow-on
// effects.
client_config.genesis = ClientGenesis::SszBytes {
genesis_state_bytes,
};
client_config.genesis = if let Some(genesis_state_bytes) =
eth2_network_config.genesis_state_bytes
{
// Set up weak subjectivity sync, or start from the hardcoded genesis state.
if let (Some(initial_state_path), Some(initial_block_path)) = (
cli_args.value_of("checkpoint-state"),
cli_args.value_of("checkpoint-block"),
) {
let read = |path: &str| {
use std::fs::File;
use std::io::Read;
File::open(Path::new(path))
.and_then(|mut f| {
let mut buffer = vec![];
f.read_to_end(&mut buffer)?;
Ok(buffer)
})
.map_err(|e| format!("Unable to open {}: {:?}", path, e))
};
let anchor_state_bytes = read(initial_state_path)?;
let anchor_block_bytes = read(initial_block_path)?;
ClientGenesis::WeakSubjSszBytes {
genesis_state_bytes,
anchor_state_bytes,
anchor_block_bytes,
}
} else if let Some(remote_bn_url) = cli_args.value_of("checkpoint-sync-url") {
let url = SensitiveUrl::parse(remote_bn_url)
.map_err(|e| format!("Invalid checkpoint sync URL: {:?}", e))?;
ClientGenesis::CheckpointSyncUrl {
genesis_state_bytes,
url,
}
} else {
// Note: re-serializing the genesis state is not so efficient, however it avoids adding
// trait bounds to the `ClientGenesis` enum. This would have significant flow-on
// effects.
ClientGenesis::SszBytes {
genesis_state_bytes,
}
}
} else {
client_config.genesis = ClientGenesis::DepositContract;
if cli_args.is_present("checkpoint-state") || cli_args.is_present("checkpoint-sync-url") {
return Err(
"Checkpoint sync is not available for this network as no genesis state is known"
.to_string(),
);
}
ClientGenesis::DepositContract
};
if cli_args.is_present("reconstruct-historic-states") {
client_config.chain.reconstruct_historic_states = true;
}
let raw_graffiti = if let Some(graffiti) = cli_args.value_of("graffiti") {

View File

@@ -0,0 +1,75 @@
use crate::chunked_vector::{chunk_key, Chunk, ChunkError, Field};
use crate::{Error, KeyValueStore, KeyValueStoreOp};
use types::EthSpec;
/// Buffered writer for chunked vectors (block roots mainly).
pub struct ChunkWriter<'a, F, E, S>
where
F: Field<E>,
E: EthSpec,
S: KeyValueStore<E>,
{
/// Buffered chunk awaiting writing to disk (always dirty).
chunk: Chunk<F::Value>,
/// Chunk index of `chunk`.
index: usize,
store: &'a S,
}
impl<'a, F, E, S> ChunkWriter<'a, F, E, S>
where
F: Field<E>,
E: EthSpec,
S: KeyValueStore<E>,
{
pub fn new(store: &'a S, vindex: usize) -> Result<Self, Error> {
let chunk_index = F::chunk_index(vindex);
let chunk = Chunk::load(store, F::column(), &chunk_key(chunk_index))?
.unwrap_or_else(|| Chunk::new(vec![F::Value::default(); F::chunk_size()]));
Ok(Self {
chunk,
index: chunk_index,
store,
})
}
/// Set the value at a given vector index, writing the current chunk and moving on if necessary.
pub fn set(
&mut self,
vindex: usize,
value: F::Value,
batch: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
let chunk_index = F::chunk_index(vindex);
// Advance to the next chunk.
if chunk_index != self.index {
self.write(batch)?;
*self = Self::new(self.store, vindex)?;
}
let i = vindex % F::chunk_size();
let existing_value = &self.chunk.values[i];
if existing_value == &value || existing_value == &F::Value::default() {
self.chunk.values[i] = value;
Ok(())
} else {
Err(ChunkError::Inconsistent {
field: F::column(),
chunk_index,
existing_value: format!("{:?}", existing_value),
new_value: format!("{:?}", value),
}
.into())
}
}
/// Write the current chunk to disk.
///
/// Should be called before the writer is dropped, in order to write the final chunk to disk.
pub fn write(&self, batch: &mut Vec<KeyValueStoreOp>) -> Result<(), Error> {
self.chunk.store(F::column(), &chunk_key(self.index), batch)
}
}

View File

@@ -97,7 +97,7 @@ where
self.current_chunk = Chunk::load(
&self.store.cold_db,
F::column(),
&chunk_key(self.next_cindex as u64),
&chunk_key(self.next_cindex),
)
.map_err(|e| {
error!(

View File

@@ -34,8 +34,8 @@ pub enum UpdatePattern {
/// Map a chunk index to bytes that can be used to key the NoSQL database.
///
/// We shift chunks up by 1 to make room for a genesis chunk that is handled separately.
pub fn chunk_key(cindex: u64) -> [u8; 8] {
(cindex + 1).to_be_bytes()
pub fn chunk_key(cindex: usize) -> [u8; 8] {
(cindex as u64 + 1).to_be_bytes()
}
/// Return the database key for the genesis value.
@@ -73,6 +73,11 @@ pub trait Field<E: EthSpec>: Copy {
128
}
/// Convert a v-index (vector index) to a chunk index.
fn chunk_index(vindex: usize) -> usize {
vindex / Self::chunk_size()
}
/// Get the value of this field at the given vector index, from the state.
fn get_value(
state: &BeaconState<E>,
@@ -399,7 +404,7 @@ where
I: Iterator<Item = usize>,
{
for chunk_index in range {
let chunk_key = &chunk_key(chunk_index as u64)[..];
let chunk_key = &chunk_key(chunk_index)[..];
let existing_chunk =
Chunk::<F::Value>::load(store, F::column(), chunk_key)?.unwrap_or_else(Chunk::default);
@@ -440,7 +445,7 @@ fn range_query<S: KeyValueStore<E>, E: EthSpec, T: Decode + Encode>(
let mut result = Vec::with_capacity(len);
for chunk_index in range {
let key = &chunk_key(chunk_index as u64)[..];
let key = &chunk_key(chunk_index)[..];
let chunk = Chunk::load(store, column, key)?.ok_or(ChunkError::Missing { chunk_index })?;
result.push(chunk);
}

View File

@@ -24,8 +24,6 @@ pub struct StoreConfig {
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct OnDiskStoreConfig {
pub slots_per_restore_point: u64,
// NOTE: redundant, see https://github.com/sigp/lighthouse/issues/1784
pub _block_cache_size: usize,
}
#[derive(Debug, Clone)]
@@ -49,7 +47,6 @@ impl StoreConfig {
pub fn as_disk_config(&self) -> OnDiskStoreConfig {
OnDiskStoreConfig {
slots_per_restore_point: self.slots_per_restore_point,
_block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
}
}

View File

@@ -13,13 +13,46 @@ pub enum Error {
BeaconStateError(BeaconStateError),
PartialBeaconStateError,
HotColdDBError(HotColdDBError),
DBError { message: String },
DBError {
message: String,
},
RlpError(String),
BlockNotFound(Hash256),
NoContinuationData,
SplitPointModified(Slot, Slot),
ConfigError(StoreConfigError),
SchemaMigrationError(String),
/// The store's `anchor_info` was mutated concurrently, the latest modification wasn't applied.
AnchorInfoConcurrentMutation,
/// The block or state is unavailable due to weak subjectivity sync.
HistoryUnavailable,
/// State reconstruction cannot commence because not all historic blocks are known.
MissingHistoricBlocks {
oldest_block_slot: Slot,
},
/// State reconstruction failed because it didn't reach the upper limit slot.
///
/// This should never happen (it's a logic error).
StateReconstructionDidNotComplete,
StateReconstructionRootMismatch {
slot: Slot,
expected: Hash256,
computed: Hash256,
},
}
pub trait HandleUnavailable<T> {
fn handle_unavailable(self) -> std::result::Result<Option<T>, Error>;
}
impl<T> HandleUnavailable<T> for Result<T> {
fn handle_unavailable(self) -> std::result::Result<Option<T>, Error> {
match self {
Ok(x) => Ok(Some(x)),
Err(Error::HistoryUnavailable) => Ok(None),
Err(e) => Err(e),
}
}
}
impl From<DecodeError> for Error {

View File

@@ -3,17 +3,15 @@ use crate::chunked_vector::{
};
use crate::config::{OnDiskStoreConfig, StoreConfig};
use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator};
use crate::impls::{
beacon_block_as_kv_store_op,
beacon_state::{get_full_state, store_full_state},
};
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::leveldb_store::BytesKey;
use crate::leveldb_store::LevelDB;
use crate::memory_store::MemoryStore;
use crate::metadata::{
CompactionTimestamp, PruningCheckpoint, SchemaVersion, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY,
CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY,
AnchorInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY,
COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY,
SCHEMA_VERSION_KEY, SPLIT_KEY,
};
use crate::metrics;
use crate::{
@@ -23,13 +21,15 @@ use crate::{
use leveldb::iterator::LevelDBIterator;
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
use slog::{debug, error, info, trace, warn, Logger};
use serde_derive::{Deserialize, Serialize};
use slog::{debug, error, info, trace, Logger};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use state_processing::{
per_block_processing, per_slot_processing, BlockProcessingError, BlockSignatureStrategy,
SlotProcessingError,
};
use std::cmp::min;
use std::convert::TryInto;
use std::marker::PhantomData;
use std::path::Path;
@@ -57,8 +57,10 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
///
/// States with slots less than `split.slot` are in the cold DB, while states with slots
/// greater than or equal are in the hot DB.
split: RwLock<Split>,
config: StoreConfig,
pub(crate) split: RwLock<Split>,
/// The starting slots for the range of blocks & states stored in the database.
anchor_info: RwLock<Option<AnchorInfo>>,
pub(crate) config: StoreConfig,
/// Cold database containing compact historical data.
pub cold_db: Cold,
/// Hot database containing duplicated but quick-to-access recent data.
@@ -68,7 +70,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// LRU cache of deserialized blocks. Updated whenever a block is loaded.
block_cache: Mutex<LruCache<Hash256, SignedBeaconBlock<E>>>,
/// Chain spec.
spec: ChainSpec,
pub(crate) spec: ChainSpec,
/// Logger.
pub(crate) log: Logger,
/// Mere vessel for E.
@@ -95,11 +97,13 @@ pub enum HotColdDBError {
MissingHotStateSummary(Hash256),
MissingEpochBoundaryState(Hash256),
MissingSplitState(Hash256, Slot),
MissingAnchorInfo,
HotStateSummaryError(BeaconStateError),
RestorePointDecodeError(ssz::DecodeError),
BlockReplayBeaconError(BeaconStateError),
BlockReplaySlotError(SlotProcessingError),
BlockReplayBlockError(BlockProcessingError),
MissingLowerLimitState(Slot),
InvalidSlotsPerRestorePoint {
slots_per_restore_point: u64,
slots_per_historical_root: u64,
@@ -126,6 +130,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
let db = HotColdDB {
split: RwLock::new(Split::default()),
anchor_info: RwLock::new(None),
cold_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
@@ -158,6 +163,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
let db = Arc::new(HotColdDB {
split: RwLock::new(Split::default()),
anchor_info: RwLock::new(None),
cold_db: LevelDB::open(cold_path)?,
hot_db: LevelDB::open(hot_path)?,
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
@@ -190,6 +196,9 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
// Load the previous split slot from the database (if any). This ensures we can
// stop and restart correctly.
if let Some(split) = db.load_split()? {
*db.split.write() = split;
*db.anchor_info.write() = db.load_anchor_info()?;
info!(
db.log,
"Hot-Cold DB initialized";
@@ -197,7 +206,6 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
"split_slot" => split.slot,
"split_state" => format!("{:?}", split.state_root)
);
*db.split.write() = split;
}
// Run a garbage collection pass.
@@ -243,8 +251,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
block: SignedBeaconBlock<E>,
) -> Result<(), Error> {
// Store on disk.
self.hot_db
.do_atomically(vec![beacon_block_as_kv_store_op(block_root, &block)])?;
let op = self.block_as_kv_store_op(block_root, &block);
self.hot_db.do_atomically(vec![op])?;
// Update cache.
self.block_cache.lock().put(*block_root, block);
@@ -252,6 +260,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}
/// Prepare a signed beacon block for storage in the database.
#[must_use]
pub fn block_as_kv_store_op(
&self,
key: &Hash256,
block: &SignedBeaconBlock<E>,
) -> KeyValueStoreOp {
// FIXME(altair): re-add block write/overhead metrics, or remove them
let db_key = get_key_for_col(DBColumn::BeaconBlock.into(), key.as_bytes());
KeyValueStoreOp::PutKeyValue(db_key, block.as_ssz_bytes())
}
/// Fetch a block from the store.
pub fn get_block(&self, block_root: &Hash256) -> Result<Option<SignedBeaconBlock<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT);
@@ -467,7 +487,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Some(state_slot) => {
let epoch_boundary_slot =
state_slot / E::slots_per_epoch() * E::slots_per_epoch();
self.load_cold_state_by_slot(epoch_boundary_slot).map(Some)
self.load_cold_state_by_slot(epoch_boundary_slot)
}
None => Ok(None),
}
@@ -492,7 +512,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
for op in batch {
match op {
StoreOp::PutBlock(block_root, block) => {
key_value_batch.push(beacon_block_as_kv_store_op(block_root, block));
key_value_batch.push(self.block_as_kv_store_op(block_root, block));
}
StoreOp::PutState(state_root, state) => {
@@ -563,6 +583,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
Ok(())
}
/// Store a post-finalization state efficiently in the hot database.
///
/// On an epoch boundary, store a full state. On an intermediate slot, store
@@ -639,21 +660,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Store a pre-finalization state in the freezer database.
///
/// Will log a warning and not store anything if the state does not lie on a restore point
/// boundary.
/// If the state doesn't lie on a restore point boundary then just its summary will be stored.
pub fn store_cold_state(
&self,
state_root: &Hash256,
state: &BeaconState<E>,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
ops.push(ColdStateSummary { slot: state.slot() }.as_kv_store_op(*state_root));
if state.slot() % self.config.slots_per_restore_point != 0 {
warn!(
self.log,
"Not storing non-restore point state in freezer";
"slot" => state.slot().as_u64(),
"state_root" => format!("{:?}", state_root)
);
return Ok(());
}
@@ -688,7 +704,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Return `None` if no state with `state_root` lies in the freezer.
pub fn load_cold_state(&self, state_root: &Hash256) -> Result<Option<BeaconState<E>>, Error> {
match self.load_cold_state_slot(state_root)? {
Some(slot) => self.load_cold_state_by_slot(slot).map(Some),
Some(slot) => self.load_cold_state_by_slot(slot),
None => Ok(None),
}
}
@@ -696,12 +712,22 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Load a pre-finalization state from the freezer database.
///
/// Will reconstruct the state if it lies between restore points.
pub fn load_cold_state_by_slot(&self, slot: Slot) -> Result<BeaconState<E>, Error> {
if slot % self.config.slots_per_restore_point == 0 {
let restore_point_idx = slot.as_u64() / self.config.slots_per_restore_point;
self.load_restore_point_by_index(restore_point_idx)
pub fn load_cold_state_by_slot(&self, slot: Slot) -> Result<Option<BeaconState<E>>, Error> {
// Guard against fetching states that do not exist due to gaps in the historic state
// database, which can occur due to checkpoint sync or re-indexing.
// See the comments in `get_historic_state_limits` for more information.
let (lower_limit, upper_limit) = self.get_historic_state_limits();
if slot <= lower_limit || slot >= upper_limit {
if slot % self.config.slots_per_restore_point == 0 {
let restore_point_idx = slot.as_u64() / self.config.slots_per_restore_point;
self.load_restore_point_by_index(restore_point_idx)
} else {
self.load_cold_intermediate_state(slot)
}
.map(Some)
} else {
self.load_cold_intermediate_state(slot)
Ok(None)
}
}
@@ -742,17 +768,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let split = self.split.read_recursive();
let low_restore_point = self.load_restore_point_by_index(low_restore_point_idx)?;
// If the slot of the high point lies outside the freezer, use the split state
// as the upper restore point.
let high_restore_point = if high_restore_point_idx * self.config.slots_per_restore_point
>= split.slot.as_u64()
{
self.get_state(&split.state_root, Some(split.slot))?.ok_or(
HotColdDBError::MissingSplitState(split.state_root, split.slot),
)?
} else {
self.load_restore_point_by_index(high_restore_point_idx)?
};
let high_restore_point = self.get_restore_point(high_restore_point_idx, &split)?;
// 2. Load the blocks from the high restore point back to the low restore point.
let blocks = self.load_blocks_to_replay(
@@ -765,6 +781,24 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.replay_blocks(low_restore_point, blocks, slot, BlockReplay::Accurate)
}
/// Get the restore point with the given index, or if it is out of bounds, the split state.
pub(crate) fn get_restore_point(
&self,
restore_point_idx: u64,
split: &Split,
) -> Result<BeaconState<E>, Error> {
if restore_point_idx * self.config.slots_per_restore_point >= split.slot.as_u64() {
self.get_state(&split.state_root, Some(split.slot))?
.ok_or(HotColdDBError::MissingSplitState(
split.state_root,
split.slot,
))
.map_err(Into::into)
} else {
self.load_restore_point_by_index(restore_point_idx)
}
}
/// Get a suitable block root for backtracking from `high_restore_point` to the state at `slot`.
///
/// Defaults to the block root for `slot`, which *should* be in range.
@@ -800,12 +834,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.as_ref()
.map_or(true, |block| block.slot() <= end_slot)
})
// Include the block at the start slot (if any). Whilst it doesn't need to be applied
// to the state, it contains a potentially useful state root.
.take_while(|result| {
result
.as_ref()
.map_or(true, |block| block.slot() >= start_slot)
// Include the block at the start slot (if any). Whilst it doesn't need to be
// applied to the state, it contains a potentially useful state root.
//
// Return `true` on an `Err` so that the `collect` fails, unless the error is a
// `BlockNotFound` error and some blocks are intentionally missing from the DB.
// This complexity is unfortunately necessary to avoid loading the parent of the
// oldest known block -- we can't know that we have all the required blocks until we
// load a block with slot less than the start slot, which is impossible if there are
// no blocks with slot less than the start slot.
.take_while(|result| match result {
Ok(block) => block.slot() >= start_slot,
Err(Error::BlockNotFound(_)) => {
self.get_oldest_block_slot() == self.spec.genesis_slot
}
Err(_) => true,
})
.collect::<Result<_, _>>()?;
blocks.reverse();
@@ -904,6 +947,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.split.read_recursive().slot
}
/// Fetch a copy of the current split slot from memory.
pub fn get_split_info(&self) -> Split {
*self.split.read_recursive()
}
pub fn set_split(&self, slot: Slot, state_root: Hash256) {
*self.split.write() = Split { slot, state_root };
}
/// Fetch the slot of the most recently stored restore point.
pub fn get_latest_restore_point_slot(&self) -> Slot {
(self.get_split_slot() - 1) / self.config.slots_per_restore_point
@@ -920,6 +972,122 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.hot_db.put(&SCHEMA_VERSION_KEY, &schema_version)
}
/// Initialise the anchor info for checkpoint sync starting from `block`.
pub fn init_anchor_info(&self, block: BeaconBlockRef<'_, E>) -> Result<(), Error> {
let anchor_slot = block.slot();
let slots_per_restore_point = self.config.slots_per_restore_point;
// Set the `state_upper_limit` to the slot of the *next* restore point.
// See `get_state_upper_limit` for rationale.
let next_restore_point_slot = if anchor_slot % slots_per_restore_point == 0 {
anchor_slot
} else {
(anchor_slot / slots_per_restore_point + 1) * slots_per_restore_point
};
let anchor_info = AnchorInfo {
anchor_slot,
oldest_block_slot: anchor_slot,
oldest_block_parent: block.parent_root(),
state_upper_limit: next_restore_point_slot,
state_lower_limit: self.spec.genesis_slot,
};
self.compare_and_set_anchor_info(None, Some(anchor_info))
}
/// Get a clone of the store's anchor info.
///
/// To do mutations, use `compare_and_set_anchor_info`.
pub fn get_anchor_info(&self) -> Option<AnchorInfo> {
self.anchor_info.read_recursive().clone()
}
/// Atomically update the anchor info from `prev_value` to `new_value`.
///
/// Return an `AnchorInfoConcurrentMutation` error if the `prev_value` provided
/// is not correct.
pub fn compare_and_set_anchor_info(
&self,
prev_value: Option<AnchorInfo>,
new_value: Option<AnchorInfo>,
) -> Result<(), Error> {
let mut anchor_info = self.anchor_info.write();
if *anchor_info == prev_value {
self.store_anchor_info(&new_value)?;
*anchor_info = new_value;
Ok(())
} else {
Err(Error::AnchorInfoConcurrentMutation)
}
}
/// Load the anchor info from disk, but do not set `self.anchor_info`.
fn load_anchor_info(&self) -> Result<Option<AnchorInfo>, Error> {
self.hot_db.get(&ANCHOR_INFO_KEY)
}
/// Store the given `anchor_info` to disk.
///
/// The argument is intended to be `self.anchor_info`, but is passed manually to avoid issues
/// with recursive locking.
fn store_anchor_info(&self, anchor_info: &Option<AnchorInfo>) -> Result<(), Error> {
if let Some(ref anchor_info) = anchor_info {
self.hot_db.put(&ANCHOR_INFO_KEY, anchor_info)?;
} else {
self.hot_db.delete::<AnchorInfo>(&ANCHOR_INFO_KEY)?;
}
Ok(())
}
/// If an anchor exists, return its `anchor_slot` field.
pub fn get_anchor_slot(&self) -> Option<Slot> {
self.anchor_info
.read_recursive()
.as_ref()
.map(|a| a.anchor_slot)
}
/// Return the slot-window describing the available historic states.
///
/// Returns `(lower_limit, upper_limit)`.
///
/// The lower limit is the maximum slot such that frozen states are available for all
/// previous slots (<=).
///
/// The upper limit is the minimum slot such that frozen states are available for all
/// subsequent slots (>=).
///
/// If `lower_limit >= upper_limit` then all states are available. This will be true
/// if the database is completely filled in, as we'll return `(split_slot, 0)` in this
/// instance.
pub fn get_historic_state_limits(&self) -> (Slot, Slot) {
// If checkpoint sync is used then states in the hot DB will always be available, but may
// become unavailable as finalisation advances due to the lack of a restore point in the
// database. For this reason we take the minimum of the split slot and the
// restore-point-aligned `state_upper_limit`, which should be set _ahead_ of the checkpoint
// slot during initialisation.
//
// E.g. if we start from a checkpoint at slot 2048+1024=3072 with SPRP=2048, then states
// with slots 3072-4095 will be available only while they are in the hot database, and this
// function will return the current split slot as the upper limit. Once slot 4096 is reached
// a new restore point will be created at that slot, making all states from 4096 onwards
// permanently available.
let split_slot = self.get_split_slot();
self.anchor_info
.read_recursive()
.as_ref()
.map_or((split_slot, self.spec.genesis_slot), |a| {
(a.state_lower_limit, min(a.state_upper_limit, split_slot))
})
}
/// Return the minimum slot such that blocks are available for all subsequent slots.
pub fn get_oldest_block_slot(&self) -> Slot {
self.anchor_info
.read_recursive()
.as_ref()
.map_or(self.spec.genesis_slot, |anchor| anchor.oldest_block_slot)
}
/// Load previously-stored config from disk.
fn load_config(&self) -> Result<Option<OnDiskStoreConfig>, Error> {
self.hot_db.get(&CONFIG_KEY)
@@ -935,6 +1103,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.hot_db.get(&SPLIT_KEY)
}
/// Store the split point to disk.
pub fn store_split(&self) -> Result<(), Error> {
self.hot_db.put_sync(&SPLIT_KEY, &*self.split.read())?;
Ok(())
}
/// Load the state root of a restore point.
fn load_restore_point_hash(&self, restore_point_index: u64) -> Result<Hash256, Error> {
let key = Self::restore_point_key(restore_point_index);
@@ -1037,6 +1211,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.map(|pc: PruningCheckpoint| pc.checkpoint))
}
/// Store the checkpoint to begin pruning from (the "old finalized checkpoint").
pub fn store_pruning_checkpoint(&self, checkpoint: Checkpoint) -> Result<(), Error> {
self.hot_db
.do_atomically(vec![self.pruning_checkpoint_store_op(checkpoint)])
}
/// Create a staged store for the pruning checkpoint.
pub fn pruning_checkpoint_store_op(&self, checkpoint: Checkpoint) -> KeyValueStoreOp {
PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY)
@@ -1075,6 +1255,11 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// The new frozen head must increase the current split slot, and lie on an epoch
// boundary (in order for the hot state summary scheme to work).
let current_split_slot = store.split.read_recursive().slot;
let anchor_slot = store
.anchor_info
.read_recursive()
.as_ref()
.map(|a| a.anchor_slot);
if frozen_head.slot() < current_split_slot {
return Err(HotColdDBError::FreezeSlotError {
@@ -1094,7 +1279,10 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// to the cold DB.
let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head);
for maybe_pair in state_root_iter.take_while(|result| match result {
Ok((_, slot)) => slot >= &current_split_slot,
Ok((_, slot)) => {
slot >= &current_split_slot
&& anchor_slot.map_or(true, |anchor_slot| slot >= &anchor_slot)
}
Err(_) => true,
}) {
let (state_root, slot) = maybe_pair?;
@@ -1183,10 +1371,10 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
}
/// Struct for storing the split slot and state root in the database.
#[derive(Debug, Clone, Copy, Default, Encode, Decode)]
#[derive(Debug, Clone, Copy, PartialEq, Default, Encode, Decode, Deserialize, Serialize)]
pub struct Split {
slot: Slot,
state_root: Hash256,
pub(crate) slot: Slot,
pub(crate) state_root: Hash256,
}
impl StoreItem for Split {
@@ -1252,8 +1440,8 @@ impl HotStateSummary {
/// Struct for summarising a state in the freezer database.
#[derive(Debug, Clone, Copy, Default, Encode, Decode)]
struct ColdStateSummary {
slot: Slot,
pub(crate) struct ColdStateSummary {
pub slot: Slot,
}
impl StoreItem for ColdStateSummary {

View File

@@ -1,15 +1 @@
use crate::*;
use ssz::Encode;
pub mod beacon_state;
/// Prepare a signed beacon block for storage in the database.
#[must_use]
pub fn beacon_block_as_kv_store_op<T: EthSpec>(
key: &Hash256,
block: &SignedBeaconBlock<T>,
) -> KeyValueStoreOp {
// FIXME(altair): re-add block write/overhead metrics, or remove them
let db_key = get_key_for_col(DBColumn::BeaconBlock.into(), key.as_bytes());
KeyValueStoreOp::PutKeyValue(db_key, block.as_ssz_bytes())
}

View File

@@ -1,3 +1,4 @@
use crate::errors::HandleUnavailable;
use crate::{Error, HotColdDB, ItemStore};
use std::borrow::Cow;
use std::marker::PhantomData;
@@ -201,15 +202,20 @@ impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> RootsIterator<'a, T,
(Ok(block_root), Ok(state_root)) => Ok(Some((*block_root, *state_root, self.slot))),
(Err(BeaconStateError::SlotOutOfBounds), Err(BeaconStateError::SlotOutOfBounds)) => {
// Read a `BeaconState` from the store that has access to prior historical roots.
let beacon_state =
next_historical_root_backtrack_state(&*self.store, &self.beacon_state)?;
if let Some(beacon_state) =
next_historical_root_backtrack_state(&*self.store, &self.beacon_state)
.handle_unavailable()?
{
self.beacon_state = Cow::Owned(beacon_state);
self.beacon_state = Cow::Owned(beacon_state);
let block_root = *self.beacon_state.get_block_root(self.slot)?;
let state_root = *self.beacon_state.get_state_root(self.slot)?;
let block_root = *self.beacon_state.get_block_root(self.slot)?;
let state_root = *self.beacon_state.get_state_root(self.slot)?;
Ok(Some((block_root, state_root, self.slot)))
Ok(Some((block_root, state_root, self.slot)))
} else {
// No more states available due to weak subjectivity sync.
Ok(None)
}
}
(Err(e), _) => Err(e.into()),
(Ok(_), Err(e)) => Err(e.into()),
@@ -329,6 +335,9 @@ impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> Iterator
}
/// Fetch the next state to use whilst backtracking in `*RootsIterator`.
///
/// Return `Err(HistoryUnavailable)` in the case where no more backtrack states are available
/// due to weak subjectivity sync.
fn next_historical_root_backtrack_state<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: &HotColdDB<E, Hot, Cold>,
current_state: &BeaconState<E>,
@@ -338,10 +347,17 @@ fn next_historical_root_backtrack_state<E: EthSpec, Hot: ItemStore<E>, Cold: Ite
// not frozen, this just means we might not jump back by the maximum amount on
// our first jump (i.e. at most 1 extra state load).
let new_state_slot = slot_of_prev_restore_point::<E>(current_state.slot());
let new_state_root = current_state.get_state_root(new_state_slot)?;
Ok(store
.get_state(new_state_root, Some(new_state_slot))?
.ok_or_else(|| BeaconStateError::MissingBeaconState((*new_state_root).into()))?)
let (_, historic_state_upper_limit) = store.get_historic_state_limits();
if new_state_slot >= historic_state_upper_limit {
let new_state_root = current_state.get_state_root(new_state_slot)?;
Ok(store
.get_state(new_state_root, Some(new_state_slot))?
.ok_or_else(|| BeaconStateError::MissingBeaconState((*new_state_root).into()))?)
} else {
Err(Error::HistoryUnavailable)
}
}
/// Compute the slot of the last guaranteed restore point in the freezer database.

View File

@@ -10,6 +10,7 @@
#[macro_use]
extern crate lazy_static;
mod chunk_writer;
pub mod chunked_iter;
pub mod chunked_vector;
pub mod config;
@@ -23,9 +24,11 @@ mod memory_store;
pub mod metadata;
pub mod metrics;
mod partial_beacon_state;
pub mod reconstruct;
pub mod iter;
pub use self::chunk_writer::ChunkWriter;
pub use self::config::StoreConfig;
pub use self::hot_cold_store::{BlockReplay, HotColdDB, HotStateSummary, Split};
pub use self::leveldb_store::LevelDB;
@@ -33,6 +36,7 @@ pub use self::memory_store::MemoryStore;
pub use self::partial_beacon_state::PartialBeaconState;
pub use errors::Error;
pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer;
pub use metadata::AnchorInfo;
pub use metrics::scrape_for_metrics;
use parking_lot::MutexGuard;
pub use types::*;

View File

@@ -1,8 +1,10 @@
use crate::{DBColumn, Error, StoreItem};
use serde_derive::{Deserialize, Serialize};
use ssz::{Decode, Encode};
use types::{Checkpoint, Hash256};
use ssz_derive::{Decode, Encode};
use types::{Checkpoint, Hash256, Slot};
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(4);
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(5);
// All the keys that get stored under the `BeaconMeta` column.
//
@@ -12,6 +14,7 @@ pub const CONFIG_KEY: Hash256 = Hash256::repeat_byte(1);
pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2);
pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SchemaVersion(pub u64);
@@ -76,3 +79,41 @@ impl StoreItem for CompactionTimestamp {
Ok(CompactionTimestamp(u64::from_ssz_bytes(bytes)?))
}
}
/// Database parameters relevant to weak subjectivity sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)]
pub struct AnchorInfo {
/// The slot at which the anchor state is present and which we cannot revert.
pub anchor_slot: Slot,
/// The slot from which historical blocks are available (>=).
pub oldest_block_slot: Slot,
/// The block root of the next block that needs to be added to fill in the history.
///
/// Zero if we know all blocks back to genesis.
pub oldest_block_parent: Hash256,
/// The slot from which historical states are available (>=).
pub state_upper_limit: Slot,
/// The slot before which historical states are available (<=).
pub state_lower_limit: Slot,
}
impl AnchorInfo {
/// Returns true if the block backfill has completed.
pub fn block_backfill_complete(&self) -> bool {
self.oldest_block_slot == 0
}
}
impl StoreItem for AnchorInfo {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}

View File

@@ -0,0 +1,160 @@
//! Implementation of historic state reconstruction (given complete block history).
use crate::hot_cold_store::{HotColdDB, HotColdDBError};
use crate::{Error, ItemStore, KeyValueStore};
use itertools::{process_results, Itertools};
use slog::info;
use state_processing::{per_block_processing, per_slot_processing, BlockSignatureStrategy};
use std::sync::Arc;
use types::{EthSpec, Hash256};
impl<E, Hot, Cold> HotColdDB<E, Hot, Cold>
where
E: EthSpec,
Hot: KeyValueStore<E> + ItemStore<E>,
Cold: KeyValueStore<E> + ItemStore<E>,
{
pub fn reconstruct_historic_states(self: &Arc<Self>) -> Result<(), Error> {
let mut anchor = if let Some(anchor) = self.get_anchor_info() {
anchor
} else {
// Nothing to do, history is complete.
return Ok(());
};
// Check that all historic blocks are known.
if anchor.oldest_block_slot != 0 {
return Err(Error::MissingHistoricBlocks {
oldest_block_slot: anchor.oldest_block_slot,
});
}
info!(
self.log,
"Beginning historic state reconstruction";
"start_slot" => anchor.state_lower_limit,
);
let slots_per_restore_point = self.config.slots_per_restore_point;
// Iterate blocks from the state lower limit to the upper limit.
let lower_limit_slot = anchor.state_lower_limit;
let split = self.get_split_info();
let upper_limit_state = self.get_restore_point(
anchor.state_upper_limit.as_u64() / slots_per_restore_point,
&split,
)?;
let upper_limit_slot = upper_limit_state.slot();
// Use a dummy root, as we never read the block for the upper limit state.
let upper_limit_block_root = Hash256::repeat_byte(0xff);
let block_root_iter = Self::forwards_block_roots_iterator(
self.clone(),
lower_limit_slot,
upper_limit_state,
upper_limit_block_root,
&self.spec,
)?;
// The state to be advanced.
let mut state = self
.load_cold_state_by_slot(lower_limit_slot)?
.ok_or(HotColdDBError::MissingLowerLimitState(lower_limit_slot))?;
state.build_all_caches(&self.spec)?;
process_results(block_root_iter, |iter| -> Result<(), Error> {
let mut io_batch = vec![];
let mut prev_state_root = None;
for ((prev_block_root, _), (block_root, slot)) in iter.tuple_windows() {
let is_skipped_slot = prev_block_root == block_root;
let block = if is_skipped_slot {
None
} else {
Some(
self.get_block(&block_root)?
.ok_or(Error::BlockNotFound(block_root))?,
)
};
// Advance state to slot.
per_slot_processing(&mut state, prev_state_root.take(), &self.spec)
.map_err(HotColdDBError::BlockReplaySlotError)?;
// Apply block.
if let Some(block) = block {
per_block_processing(
&mut state,
&block,
Some(block_root),
BlockSignatureStrategy::NoVerification,
&self.spec,
)
.map_err(HotColdDBError::BlockReplayBlockError)?;
prev_state_root = Some(block.state_root());
}
let state_root = prev_state_root
.ok_or(())
.or_else(|_| state.update_tree_hash_cache())?;
// Stage state for storage in freezer DB.
self.store_cold_state(&state_root, &state, &mut io_batch)?;
// If the slot lies on an epoch boundary, commit the batch and update the anchor.
if slot % slots_per_restore_point == 0 || slot + 1 == upper_limit_slot {
info!(
self.log,
"State reconstruction in progress";
"slot" => slot,
"remaining" => upper_limit_slot - 1 - slot
);
self.cold_db.do_atomically(std::mem::take(&mut io_batch))?;
// Update anchor.
let old_anchor = Some(anchor.clone());
if slot + 1 == upper_limit_slot {
// The two limits have met in the middle! We're done!
// Perform one last integrity check on the state reached.
let computed_state_root = state.update_tree_hash_cache()?;
if computed_state_root != state_root {
return Err(Error::StateReconstructionRootMismatch {
slot,
expected: state_root,
computed: computed_state_root,
});
}
self.compare_and_set_anchor_info(old_anchor, None)?;
return Ok(());
} else {
// The lower limit has been raised, store it.
anchor.state_lower_limit = slot;
self.compare_and_set_anchor_info(old_anchor, Some(anchor.clone()))?;
}
}
}
// Should always reach the `upper_limit_slot` and return early above.
Err(Error::StateReconstructionDidNotComplete)
})??;
// Check that the split point wasn't mutated during the state reconstruction process.
// It shouldn't have been, due to the serialization of requests through the store migrator,
// so this is just a paranoid check.
let latest_split = self.get_split_info();
if split != latest_split {
return Err(Error::SplitPointModified(latest_split.slot, split.slot));
}
Ok(())
}
}