mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-07 00:42:42 +00:00
Add duties pruning
This commit is contained in:
@@ -15,6 +15,9 @@ use types::{ChainSpec, Epoch, EthSpec, PublicKey, Slot};
|
|||||||
/// Delay this period of time after the slot starts. This allows the node to process the new slot.
|
/// Delay this period of time after the slot starts. This allows the node to process the new slot.
|
||||||
const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100);
|
const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100);
|
||||||
|
|
||||||
|
/// Remove any duties where the `duties_epoch < current_epoch - PRUNE_DEPTH`.
|
||||||
|
const PRUNE_DEPTH: u64 = 4;
|
||||||
|
|
||||||
type BaseHashMap = HashMap<PublicKey, HashMap<Epoch, ValidatorDuty>>;
|
type BaseHashMap = HashMap<PublicKey, HashMap<Epoch, ValidatorDuty>>;
|
||||||
|
|
||||||
enum InsertOutcome {
|
enum InsertOutcome {
|
||||||
@@ -108,7 +111,6 @@ impl DutiesStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: call this.
|
|
||||||
fn prune(&self, prior_to: Epoch) {
|
fn prune(&self, prior_to: Epoch) {
|
||||||
self.store
|
self.store
|
||||||
.write()
|
.write()
|
||||||
@@ -250,21 +252,39 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> DutiesService<T, E> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn do_update(self) -> impl Future<Item = (), Error = ()> {
|
fn do_update(self) -> impl Future<Item = (), Error = ()> {
|
||||||
let slots_per_epoch = E::slots_per_epoch();
|
|
||||||
let service_1 = self.clone();
|
let service_1 = self.clone();
|
||||||
let service_2 = self.clone();
|
let service_2 = self.clone();
|
||||||
let log = self.context.log.clone();
|
let service_3 = self.clone();
|
||||||
|
let log_1 = self.context.log.clone();
|
||||||
|
let log_2 = self.context.log.clone();
|
||||||
|
|
||||||
self.slot_clock
|
self.slot_clock
|
||||||
.now()
|
.now()
|
||||||
.ok_or_else(move || {
|
.ok_or_else(move || {
|
||||||
error!(log, "Duties manager failed to read slot clock");
|
error!(log_1, "Duties manager failed to read slot clock");
|
||||||
})
|
})
|
||||||
.into_future()
|
.into_future()
|
||||||
.map(move |slot| slot.epoch(slots_per_epoch))
|
.map(move |slot| {
|
||||||
|
let epoch = slot.epoch(E::slots_per_epoch());
|
||||||
|
|
||||||
|
if slot % E::slots_per_epoch() == 0 {
|
||||||
|
let prune_below = epoch - PRUNE_DEPTH;
|
||||||
|
|
||||||
|
trace!(
|
||||||
|
log_2,
|
||||||
|
"Pruning duties cache";
|
||||||
|
"pruning_below" => prune_below.as_u64(),
|
||||||
|
"current_epoch" => epoch.as_u64(),
|
||||||
|
);
|
||||||
|
|
||||||
|
service_1.store.prune(prune_below);
|
||||||
|
}
|
||||||
|
|
||||||
|
epoch
|
||||||
|
})
|
||||||
.and_then(move |epoch| {
|
.and_then(move |epoch| {
|
||||||
let log = service_1.context.log.clone();
|
let log = service_2.context.log.clone();
|
||||||
service_1.update_epoch(epoch).then(move |result| {
|
service_2.update_epoch(epoch).then(move |result| {
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
error!(
|
error!(
|
||||||
log,
|
log,
|
||||||
@@ -273,8 +293,8 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> DutiesService<T, E> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let log = service_2.context.log.clone();
|
let log = service_3.context.log.clone();
|
||||||
service_2.update_epoch(epoch + 1).map_err(move |e| {
|
service_3.update_epoch(epoch + 1).map_err(move |e| {
|
||||||
error!(
|
error!(
|
||||||
log,
|
log,
|
||||||
"Failed to get next epoch duties";
|
"Failed to get next epoch duties";
|
||||||
|
|||||||
Reference in New Issue
Block a user