Address queue congestion in migrator (#1923)

## Issue Addressed

*Should* address #1917

## Proposed Changes

Stops the `BackgroupMigrator` rx channel from backing up with big `BeaconState` messages.

Looking at some logs from my Medalla node, we can see a discrepancy between the head finalized epoch and the migrator finalized epoch:

```
Nov 17 16:50:21.606 DEBG Head beacon block                       slot: 129214, root: 0xbc7a…0b99, finalized_epoch: 4033, finalized_root: 0xf930…6562, justified_epoch: 4035, justified_root: 0x206b…9321, service: beacon
Nov 17 16:50:21.626 DEBG Batch processed                         service: sync, processed_blocks: 43, last_block_slot: 129214, chain: 8274002112260436595, first_block_slot: 129153, batch_epoch: 4036
Nov 17 16:50:21.626 DEBG Chain advanced                          processing_target: 4036, new_start: 4036, previous_start: 4034, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Completed batch received                awaiting_batches: 5, blocks: 47, epoch: 4048, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Requesting batch                        start_slot: 129601, end_slot: 129664, downloaded: 0, processed: 0, state: Downloading(16Uiu2HAmG3C3t1McaseReECjAF694tjVVjkDoneZEbxNhWm1nZaT, 0 blocks, 1273), epoch: 4050, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.654 DEBG Database compaction complete            service: beacon
Nov 17 16:50:22.655 INFO Starting database pruning               new_finalized_epoch: 2193, old_finalized_epoch: 2192, service: beacon
```

I believe this indicates that the migrator rx has a backed-up queue of `MigrationNotification` items which each contain a `BeaconState`.

## TODO

- [x] Remove finalized state requirement for op-pool
This commit is contained in:
Paul Hauner
2020-11-17 23:11:26 +00:00
parent a60ab4eff2
commit 103103e72e
3 changed files with 108 additions and 86 deletions

View File

@@ -1926,25 +1926,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
};
let new_finalized_checkpoint = new_head.beacon_state.finalized_checkpoint;
// State root of the finalized state on the epoch boundary, NOT the state
// of the finalized block. We need to use an iterator in case the state is beyond
// the reach of the new head's `state_roots` array.
let new_finalized_slot = new_finalized_checkpoint
.epoch
.start_slot(T::EthSpec::slots_per_epoch());
let new_finalized_state_root = process_results(
StateRootsIterator::new(self.store.clone(), &new_head.beacon_state),
|mut iter| {
iter.find_map(|(state_root, slot)| {
if slot == new_finalized_slot {
Some(state_root)
} else {
None
}
})
},
)?
.ok_or_else(|| Error::MissingFinalizedStateRoot(new_finalized_slot))?;
// It is an error to try to update to a head with a lesser finalized epoch.
if new_finalized_checkpoint.epoch < old_finalized_checkpoint.epoch {
@@ -1991,7 +1972,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
});
if new_finalized_checkpoint.epoch != old_finalized_checkpoint.epoch {
self.after_finalization(new_finalized_checkpoint, new_finalized_state_root)?;
// Due to race conditions, it's technically possible that the head we load here is
// different to the one earlier in this function.
//
// Since the head can't move backwards in terms of finalized epoch, we can only load a
// head with a *later* finalized state. There is no harm in this.
let head = self
.canonical_head
.try_read_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| Error::CanonicalHeadLockTimeout)?;
// State root of the finalized state on the epoch boundary, NOT the state
// of the finalized block. We need to use an iterator in case the state is beyond
// the reach of the new head's `state_roots` array.
let new_finalized_slot = head
.beacon_state
.finalized_checkpoint
.epoch
.start_slot(T::EthSpec::slots_per_epoch());
let new_finalized_state_root = process_results(
StateRootsIterator::new(self.store.clone(), &head.beacon_state),
|mut iter| {
iter.find_map(|(state_root, slot)| {
if slot == new_finalized_slot {
Some(state_root)
} else {
None
}
})
},
)?
.ok_or_else(|| Error::MissingFinalizedStateRoot(new_finalized_slot))?;
self.after_finalization(&head.beacon_state, new_finalized_state_root)?;
}
let _ = self.event_handler.register(EventKind::BeaconHeadChanged {
@@ -2072,10 +2085,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Performs pruning and finality-based optimizations.
fn after_finalization(
&self,
new_finalized_checkpoint: Checkpoint,
head_state: &BeaconState<T::EthSpec>,
new_finalized_state_root: Hash256,
) -> Result<(), Error> {
self.fork_choice.write().prune()?;
let new_finalized_checkpoint = head_state.finalized_checkpoint;
self.observed_block_producers.prune(
new_finalized_checkpoint
@@ -2097,20 +2111,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
});
let finalized_state = self
.get_state(&new_finalized_state_root, None)?
.ok_or_else(|| Error::MissingBeaconState(new_finalized_state_root))?;
self.op_pool.prune_all(
&finalized_state,
self.epoch()?,
self.head_info()?.fork,
&self.spec,
);
self.op_pool.prune_all(head_state, self.epoch()?);
self.store_migrator.process_finalization(
new_finalized_state_root.into(),
finalized_state,
new_finalized_checkpoint,
self.head_tracker.clone(),
)?;

View File

@@ -3,7 +3,7 @@ use crate::errors::BeaconChainError;
use crate::head_tracker::{HeadTracker, SszHeadTracker};
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use parking_lot::Mutex;
use slog::{debug, info, warn, Logger};
use slog::{debug, error, info, warn, Logger};
use std::collections::{HashMap, HashSet};
use std::mem;
use std::sync::{mpsc, Arc};
@@ -30,12 +30,7 @@ const COMPACTION_FINALITY_DISTANCE: u64 = 1024;
pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
db: Arc<HotColdDB<E, Hot, Cold>>,
#[allow(clippy::type_complexity)]
tx_thread: Option<
Mutex<(
mpsc::Sender<MigrationNotification<E>>,
thread::JoinHandle<()>,
)>,
>,
tx_thread: Option<Mutex<(mpsc::Sender<MigrationNotification>, thread::JoinHandle<()>)>>,
/// Genesis block root, for persisting the `PersistedBeaconChain`.
genesis_block_root: Hash256,
log: Logger,
@@ -78,9 +73,8 @@ pub enum PruningError {
}
/// Message sent to the migration thread containing the information it needs to run.
pub struct MigrationNotification<E: EthSpec> {
pub struct MigrationNotification {
finalized_state_root: BeaconStateHash,
finalized_state: BeaconState<E>,
finalized_checkpoint: Checkpoint,
head_tracker: Arc<HeadTracker>,
genesis_block_root: Hash256,
@@ -115,13 +109,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
pub fn process_finalization(
&self,
finalized_state_root: BeaconStateHash,
finalized_state: BeaconState<E>,
finalized_checkpoint: Checkpoint,
head_tracker: Arc<HeadTracker>,
) -> Result<(), BeaconChainError> {
let notif = MigrationNotification {
finalized_state_root,
finalized_state,
finalized_checkpoint,
head_tracker,
genesis_block_root: self.genesis_block_root,
@@ -161,13 +153,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
/// Perform the actual work of `process_finalization`.
fn run_migration(
db: Arc<HotColdDB<E, Hot, Cold>>,
notif: MigrationNotification<E>,
log: &Logger,
) {
fn run_migration(db: Arc<HotColdDB<E, Hot, Cold>>, notif: MigrationNotification, log: &Logger) {
let finalized_state_root = notif.finalized_state_root;
let finalized_state = notif.finalized_state;
let finalized_state = match db.get_state(&finalized_state_root.into(), None) {
Ok(Some(state)) => state,
other => {
error!(
log,
"Migrator failed to load state";
"state_root" => ?finalized_state_root,
"error" => ?other
);
return;
}
};
let old_finalized_checkpoint = match Self::prune_abandoned_forks(
db.clone(),
@@ -231,13 +231,22 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
fn spawn_thread(
db: Arc<HotColdDB<E, Hot, Cold>>,
log: Logger,
) -> (
mpsc::Sender<MigrationNotification<E>>,
thread::JoinHandle<()>,
) {
) -> (mpsc::Sender<MigrationNotification>, thread::JoinHandle<()>) {
let (tx, rx) = mpsc::channel();
let thread = thread::spawn(move || {
while let Ok(notif) = rx.recv() {
// Read the rest of the messages in the channel, ultimately choosing the `notif`
// with the highest finalized epoch.
let notif = rx
.try_iter()
.fold(notif, |best, other: MigrationNotification| {
if other.finalized_checkpoint.epoch > best.finalized_checkpoint.epoch {
other
} else {
best
}
});
Self::run_migration(db.clone(), notif, &log);
}
});