Fix race condition in VC block proposal service (#1282)

Closes #918
Closes #923
This commit is contained in:
Michael Sproul
2020-07-07 14:03:21 +10:00
committed by GitHub
parent 5bc8fea2e0
commit 20a48df80a
12 changed files with 329 additions and 179 deletions

View File

@@ -1,20 +1,16 @@
use crate::{duties_service::DutiesService, validator_store::ValidatorStore};
use crate::validator_store::ValidatorStore;
use environment::RuntimeContext;
use futures::channel::mpsc::Receiver;
use futures::{StreamExt, TryFutureExt};
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
use slog::{crit, error, info, trace};
use slog::{crit, debug, error, info, trace, warn};
use slot_clock::SlotClock;
use std::ops::Deref;
use std::sync::Arc;
use tokio::time::{interval_at, Duration, Instant};
use types::{ChainSpec, EthSpec, PublicKey, Slot};
/// Delay this period of time after the slot starts. This allows the node to process the new slot.
const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100);
use types::{EthSpec, PublicKey, Slot};
/// Builds a `BlockService`.
pub struct BlockServiceBuilder<T, E: EthSpec> {
duties_service: Option<DutiesService<T, E>>,
validator_store: Option<ValidatorStore<T, E>>,
slot_clock: Option<Arc<T>>,
beacon_node: Option<RemoteBeaconNode<E>>,
@@ -24,7 +20,6 @@ pub struct BlockServiceBuilder<T, E: EthSpec> {
impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
pub fn new() -> Self {
Self {
duties_service: None,
validator_store: None,
slot_clock: None,
beacon_node: None,
@@ -32,11 +27,6 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
}
}
pub fn duties_service(mut self, service: DutiesService<T, E>) -> Self {
self.duties_service = Some(service);
self
}
pub fn validator_store(mut self, store: ValidatorStore<T, E>) -> Self {
self.validator_store = Some(store);
self
@@ -60,9 +50,6 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
pub fn build(self) -> Result<BlockService<T, E>, String> {
Ok(BlockService {
inner: Arc::new(Inner {
duties_service: self
.duties_service
.ok_or_else(|| "Cannot build BlockService without duties_service")?,
validator_store: self
.validator_store
.ok_or_else(|| "Cannot build BlockService without validator_store")?,
@@ -82,7 +69,6 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
/// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> {
duties_service: DutiesService<T, E>,
validator_store: ValidatorStore<T, E>,
slot_clock: Arc<T>,
beacon_node: RemoteBeaconNode<E>,
@@ -110,77 +96,88 @@ impl<T, E: EthSpec> Deref for BlockService<T, E> {
}
}
/// Notification from the duties service that we should try to produce a block.
pub struct BlockServiceNotification {
pub slot: Slot,
pub block_proposers: Vec<PublicKey>,
}
impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
/// Starts the service that periodically attempts to produce blocks.
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
pub fn start_update_service(
self,
notification_rx: Receiver<BlockServiceNotification>,
) -> Result<(), String> {
let log = self.context.log().clone();
let duration_to_next_slot = self
.slot_clock
.duration_to_next_slot()
.ok_or_else(|| "Unable to determine duration to next slot".to_string())?;
info!(
log,
"Block production service started";
"next_update_millis" => duration_to_next_slot.as_millis()
);
let mut interval = {
let slot_duration = Duration::from_millis(spec.milliseconds_per_slot);
// Note: interval_at panics if slot_duration = 0
interval_at(
Instant::now() + duration_to_next_slot + TIME_DELAY_FROM_SLOT,
slot_duration,
)
};
info!(log, "Block production service started");
let executor = self.inner.context.executor.clone();
let interval_fut = async move {
while interval.next().await.is_some() {
self.do_update().await.ok();
let block_service_fut = notification_rx.for_each(move |notif| {
let service = self.clone();
async move {
service.do_update(notif).await.ok();
}
};
});
executor.spawn(interval_fut, "block_service");
executor.spawn(block_service_fut, "block_service");
Ok(())
}
/// Attempt to produce a block for any block producers in the `ValidatorStore`.
async fn do_update(&self) -> Result<(), ()> {
async fn do_update(&self, notification: BlockServiceNotification) -> Result<(), ()> {
let log = self.context.log();
let slot = self.slot_clock.now().ok_or_else(move || {
crit!(log, "Duties manager failed to read slot clock");
})?;
if notification.slot != slot {
warn!(
log,
"Skipping block production for expired slot";
"current_slot" => slot.as_u64(),
"notification_slot" => notification.slot.as_u64(),
"info" => "Your machine could be overloaded"
);
return Ok(());
}
if slot == self.context.eth2_config.spec.genesis_slot {
debug!(
log,
"Not producing block at genesis slot";
"proposers" => format!("{:?}", notification.block_proposers),
);
return Ok(());
}
trace!(
log,
"Block service update started";
"slot" => slot.as_u64()
);
let iter = self.duties_service.block_producers(slot).into_iter();
let proposers = notification.block_proposers;
if iter.len() == 0 {
if proposers.is_empty() {
trace!(
log,
"No local block proposers for this slot";
"slot" => slot.as_u64()
)
} else if iter.len() > 1 {
} else if proposers.len() > 1 {
error!(
log,
"Multiple block proposers for this slot";
"action" => "producing blocks for all proposers",
"num_proposers" => iter.len(),
"num_proposers" => proposers.len(),
"slot" => slot.as_u64(),
)
}
iter.for_each(|validator_pubkey| {
proposers.into_iter().for_each(|validator_pubkey| {
let service = self.clone();
let log = log.clone();
self.inner.context.executor.runtime_handle().spawn(

View File

@@ -1,6 +1,9 @@
use crate::{is_synced::is_synced, validator_store::ValidatorStore};
use crate::{
block_service::BlockServiceNotification, is_synced::is_synced, validator_store::ValidatorStore,
};
use environment::RuntimeContext;
use futures::StreamExt;
use futures::channel::mpsc::Sender;
use futures::{SinkExt, StreamExt};
use parking_lot::RwLock;
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
use rest_types::{ValidatorDuty, ValidatorDutyBytes, ValidatorSubscription};
@@ -137,6 +140,8 @@ enum InsertOutcome {
NewEpoch,
/// The duties were identical to some already in the store.
Identical,
/// The duties informed us of new proposal slots but were otherwise identical.
NewProposalSlots,
/// There were duties for this validator and epoch in the store that were different to the ones
/// provided. The existing duties were replaced.
Replaced { should_resubscribe: bool },
@@ -149,10 +154,10 @@ impl InsertOutcome {
pub fn is_subscription_candidate(self) -> bool {
match self {
InsertOutcome::Replaced { should_resubscribe } => should_resubscribe,
InsertOutcome::NewValidator => true,
InsertOutcome::NewEpoch => true,
InsertOutcome::Identical => false,
InsertOutcome::Invalid => false,
InsertOutcome::NewValidator | InsertOutcome::NewEpoch => true,
InsertOutcome::Identical | InsertOutcome::Invalid | InsertOutcome::NewProposalSlots => {
false
}
}
}
}
@@ -171,8 +176,14 @@ impl DutiesStore {
.filter(|(_validator_pubkey, validator_map)| {
validator_map
.get(&epoch)
.map(|duties| !duties.duty.block_proposal_slots.is_empty())
.unwrap_or_else(|| false)
.map(|duties| {
duties
.duty
.block_proposal_slots
.as_ref()
.map_or(false, |proposal_slots| !proposal_slots.is_empty())
})
.unwrap_or(false)
})
.count()
}
@@ -191,7 +202,7 @@ impl DutiesStore {
.count()
}
fn block_producers(&self, slot: Slot, slots_per_epoch: u64) -> Vec<PublicKey> {
fn block_proposers(&self, slot: Slot, slots_per_epoch: u64) -> Vec<PublicKey> {
self.store
.read()
.iter()
@@ -201,7 +212,7 @@ impl DutiesStore {
let epoch = slot.epoch(slots_per_epoch);
validator_map.get(&epoch).and_then(|duties| {
if duties.duty.block_proposal_slots.contains(&slot) {
if duties.duty.block_proposal_slots.as_ref()?.contains(&slot) {
Some(duties.duty.validator_pubkey.clone())
} else {
None
@@ -260,8 +271,15 @@ impl DutiesStore {
if let Some(validator_map) = store.get_mut(&duties.duty.validator_pubkey) {
if let Some(known_duties) = validator_map.get_mut(&epoch) {
if known_duties.duty == duties.duty {
Ok(InsertOutcome::Identical)
if known_duties.duty.eq_ignoring_proposal_slots(&duties.duty) {
if known_duties.duty.block_proposal_slots == duties.duty.block_proposal_slots {
Ok(InsertOutcome::Identical)
} else if duties.duty.block_proposal_slots.is_some() {
known_duties.duty.block_proposal_slots = duties.duty.block_proposal_slots;
Ok(InsertOutcome::NewProposalSlots)
} else {
Ok(InsertOutcome::Invalid)
}
} else {
// Compute the selection proof.
duties.compute_selection_proof(validator_store)?;
@@ -388,8 +406,9 @@ pub struct Inner<T, E: EthSpec> {
/// Maintains a store of the duties for all voting validators in the `validator_store`.
///
/// Polls the beacon node at the start of each epoch, collecting duties for the current and next
/// epoch.
/// Polls the beacon node at the start of each slot, collecting duties for the current and next
/// epoch. The duties service notifies the block production service to run each time it completes,
/// so it *must* be run every slot.
pub struct DutiesService<T, E: EthSpec> {
inner: Arc<Inner<T, E>>,
}
@@ -430,8 +449,8 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
///
/// It is possible that multiple validators have an identical proposal slot, however that is
/// likely the result of heavy forking (lol) or inconsistent beacon node connections.
pub fn block_producers(&self, slot: Slot) -> Vec<PublicKey> {
self.store.block_producers(slot, E::slots_per_epoch())
pub fn block_proposers(&self, slot: Slot) -> Vec<PublicKey> {
self.store.block_proposers(slot, E::slots_per_epoch())
}
/// Returns all `ValidatorDuty` for the given `slot`.
@@ -440,7 +459,11 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
}
/// Start the service that periodically polls the beacon node for validator duties.
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
pub fn start_update_service(
self,
mut block_service_tx: Sender<BlockServiceNotification>,
spec: &ChainSpec,
) -> Result<(), String> {
let duration_to_next_slot = self
.slot_clock
.duration_to_next_slot()
@@ -456,17 +479,19 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
};
// Run an immediate update before starting the updater service.
let duties_service = self.clone();
let mut block_service_tx_clone = block_service_tx.clone();
self.inner
.context
.executor
.runtime_handle()
.spawn(self.clone().do_update());
.spawn(async move { duties_service.do_update(&mut block_service_tx_clone).await });
let executor = self.inner.context.executor.clone();
let interval_fut = async move {
while interval.next().await.is_some() {
self.clone().do_update().await.ok();
self.clone().do_update(&mut block_service_tx).await;
}
};
@@ -476,42 +501,40 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
}
/// Attempt to download the duties of all managed validators for this epoch and the next.
async fn do_update(self) -> Result<(), ()> {
async fn do_update(self, block_service_tx: &mut Sender<BlockServiceNotification>) {
let log = self.context.log();
if !is_synced(&self.beacon_node, &self.slot_clock, None).await
&& !self.allow_unsynced_beacon_node
{
return Ok(());
return;
}
let current_epoch = self
.slot_clock
.now()
.ok_or_else(|| {
error!(log, "Duties manager failed to read slot clock");
})
.map(|slot| {
let epoch = slot.epoch(E::slots_per_epoch());
let slot = if let Some(slot) = self.slot_clock.now() {
slot
} else {
error!(log, "Duties manager failed to read slot clock");
return;
};
if slot % E::slots_per_epoch() == 0 {
let prune_below = epoch - PRUNE_DEPTH;
let current_epoch = slot.epoch(E::slots_per_epoch());
trace!(
log,
"Pruning duties cache";
"pruning_below" => prune_below.as_u64(),
"current_epoch" => epoch.as_u64(),
);
if slot % E::slots_per_epoch() == 0 {
let prune_below = current_epoch - PRUNE_DEPTH;
self.store.prune(prune_below);
}
trace!(
log,
"Pruning duties cache";
"pruning_below" => prune_below.as_u64(),
"current_epoch" => current_epoch.as_u64(),
);
epoch
})?;
self.store.prune(prune_below);
}
let result = self.clone().update_epoch(current_epoch).await;
if let Err(e) = result {
// Update duties for the current epoch, but keep running if there's an error:
// block production or the next epoch update could still succeed.
if let Err(e) = self.clone().update_epoch(current_epoch).await {
error!(
log,
"Failed to get current epoch duties";
@@ -519,18 +542,29 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
);
}
self.clone()
.update_epoch(current_epoch + 1)
// Notify the block service to produce a block.
if let Err(e) = block_service_tx
.send(BlockServiceNotification {
slot,
block_proposers: self.block_proposers(slot),
})
.await
.map_err(move |e| {
error!(
log,
"Failed to get next epoch duties";
"http_error" => format!("{:?}", e)
);
})?;
{
error!(
log,
"Failed to notify block service";
"error" => format!("{:?}", e)
);
};
Ok(())
// Update duties for the next epoch.
if let Err(e) = self.clone().update_epoch(current_epoch + 1).await {
error!(
log,
"Failed to get next epoch duties";
"http_error" => format!("{:?}", e)
);
}
}
/// Attempt to download the duties of all managed validators for the given `epoch`.
@@ -548,6 +582,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
let mut new_validator = 0;
let mut new_epoch = 0;
let mut new_proposal_slots = 0;
let mut identical = 0;
let mut replaced = 0;
let mut invalid = 0;
@@ -596,6 +631,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
);
new_validator += 1;
}
InsertOutcome::NewProposalSlots => new_proposal_slots += 1,
InsertOutcome::NewEpoch => new_epoch += 1,
InsertOutcome::Identical => identical += 1,
InsertOutcome::Replaced { .. } => replaced += 1,
@@ -634,6 +670,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
"Performed duties update";
"identical" => identical,
"new_epoch" => new_epoch,
"new_proposal_slots" => new_proposal_slots,
"new_validator" => new_validator,
"replaced" => replaced,
"epoch" => format!("{}", epoch)
@@ -643,7 +680,8 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
warn!(
log,
"Duties changed during routine update";
"info" => "Chain re-org likely occurred"
"info" => "Chain re-org likely occurred",
"replaced" => replaced,
)
}
@@ -688,8 +726,9 @@ fn duties_match_epoch(duties: &ValidatorDuty, epoch: Epoch, slots_per_epoch: u64
duties
.attestation_slot
.map_or(true, |slot| slot.epoch(slots_per_epoch) == epoch)
&& duties
.block_proposal_slots
.iter()
.all(|slot| slot.epoch(slots_per_epoch) == epoch)
&& duties.block_proposal_slots.as_ref().map_or(true, |slots| {
slots
.iter()
.all(|slot| slot.epoch(slots_per_epoch) == epoch)
})
}

View File

@@ -18,6 +18,7 @@ use config::SLASHING_PROTECTION_FILENAME;
use duties_service::{DutiesService, DutiesServiceBuilder};
use environment::RuntimeContext;
use fork_service::{ForkService, ForkServiceBuilder};
use futures::channel::mpsc;
use notifier::spawn_notifier;
use remote_beacon_node::RemoteBeaconNode;
use slog::{error, info, warn, Logger};
@@ -208,7 +209,6 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
.build()?;
let block_service = BlockServiceBuilder::new()
.duties_service(duties_service.clone())
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
.beacon_node(beacon_node.clone())
@@ -234,9 +234,15 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
}
pub fn start_service(&mut self) -> Result<(), String> {
// We use `SLOTS_PER_EPOCH` as the capacity of the block notification channel, because
// we don't except notifications to be delayed by more than a single slot, let alone a
// whole epoch!
let channel_capacity = T::slots_per_epoch() as usize;
let (block_service_tx, block_service_rx) = mpsc::channel(channel_capacity);
self.duties_service
.clone()
.start_update_service(&self.context.eth2_config.spec)
.start_update_service(block_service_tx, &self.context.eth2_config.spec)
.map_err(|e| format!("Unable to start duties service: {}", e))?;
self.fork_service
@@ -246,7 +252,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
self.block_service
.clone()
.start_update_service(&self.context.eth2_config.spec)
.start_update_service(block_service_rx)
.map_err(|e| format!("Unable to start block service: {}", e))?;
self.attestation_service