Add beacon.watch (#3362)

> This is currently a WIP and all features are subject to alteration or removal at any time.

## Overview

The successor to #2873.

Contains the backbone of `beacon.watch` including syncing code, the initial API, and several core database tables.

See `watch/README.md` for more information, requirements and usage.
This commit is contained in:
Mac L
2023-04-03 05:35:11 +00:00
parent 1e029ce538
commit 8630ddfec4
80 changed files with 7663 additions and 236 deletions

View File

@@ -0,0 +1,224 @@
use crate::database::{
schema::{suboptimal_attestations, validators},
watch_types::{WatchPK, WatchSlot},
Error, PgConn, MAX_SIZE_BATCH_INSERT,
};
use diesel::prelude::*;
use diesel::{Insertable, Queryable};
use log::debug;
use serde::{Deserialize, Serialize};
use std::time::Instant;
use types::Epoch;
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct WatchAttestation {
pub index: i32,
pub epoch: Epoch,
pub source: bool,
pub head: bool,
pub target: bool,
}
impl WatchAttestation {
pub fn optimal(index: i32, epoch: Epoch) -> WatchAttestation {
WatchAttestation {
index,
epoch,
source: true,
head: true,
target: true,
}
}
}
#[derive(Debug, Queryable, Insertable, Serialize, Deserialize)]
#[diesel(table_name = suboptimal_attestations)]
pub struct WatchSuboptimalAttestation {
pub epoch_start_slot: WatchSlot,
pub index: i32,
pub source: bool,
pub head: bool,
pub target: bool,
}
impl WatchSuboptimalAttestation {
pub fn to_attestation(&self, slots_per_epoch: u64) -> WatchAttestation {
WatchAttestation {
index: self.index,
epoch: self.epoch_start_slot.epoch(slots_per_epoch),
source: self.source,
head: self.head,
target: self.target,
}
}
}
/// Insert a batch of values into the `suboptimal_attestations` table
///
/// Since attestations technically occur per-slot but we only store them per-epoch (via its
/// `start_slot`) so if any slot in the epoch changes, we need to resync the whole epoch as a
/// 'suboptimal' attestation could now be 'optimal'.
///
/// This is handled in the update code, where in the case of a re-org, the affected epoch is
/// deleted completely.
///
/// On a conflict, it will do nothing.
pub fn insert_batch_suboptimal_attestations(
conn: &mut PgConn,
attestations: Vec<WatchSuboptimalAttestation>,
) -> Result<(), Error> {
use self::suboptimal_attestations::dsl::*;
let mut count = 0;
let timer = Instant::now();
for chunk in attestations.chunks(MAX_SIZE_BATCH_INSERT) {
count += diesel::insert_into(suboptimal_attestations)
.values(chunk)
.on_conflict_do_nothing()
.execute(conn)?;
}
let time_taken = timer.elapsed();
debug!("Attestations inserted, count: {count}, time taken: {time_taken:?}");
Ok(())
}
/// Selects the row from the `suboptimal_attestations` table where `epoch_start_slot` is minimum.
pub fn get_lowest_attestation(
conn: &mut PgConn,
) -> Result<Option<WatchSuboptimalAttestation>, Error> {
use self::suboptimal_attestations::dsl::*;
Ok(suboptimal_attestations
.order_by(epoch_start_slot.asc())
.limit(1)
.first::<WatchSuboptimalAttestation>(conn)
.optional()?)
}
/// Selects the row from the `suboptimal_attestations` table where `epoch_start_slot` is maximum.
pub fn get_highest_attestation(
conn: &mut PgConn,
) -> Result<Option<WatchSuboptimalAttestation>, Error> {
use self::suboptimal_attestations::dsl::*;
Ok(suboptimal_attestations
.order_by(epoch_start_slot.desc())
.limit(1)
.first::<WatchSuboptimalAttestation>(conn)
.optional()?)
}
/// Selects a single row from the `suboptimal_attestations` table corresponding to a given
/// `index_query` and `epoch_query`.
pub fn get_attestation_by_index(
conn: &mut PgConn,
index_query: i32,
epoch_query: Epoch,
slots_per_epoch: u64,
) -> Result<Option<WatchSuboptimalAttestation>, Error> {
use self::suboptimal_attestations::dsl::*;
let timer = Instant::now();
let result = suboptimal_attestations
.filter(epoch_start_slot.eq(WatchSlot::from_slot(
epoch_query.start_slot(slots_per_epoch),
)))
.filter(index.eq(index_query))
.first::<WatchSuboptimalAttestation>(conn)
.optional()?;
let time_taken = timer.elapsed();
debug!("Attestation requested for validator: {index_query}, epoch: {epoch_query}, time taken: {time_taken:?}");
Ok(result)
}
/// Selects a single row from the `suboptimal_attestations` table corresponding
/// to a given `pubkey_query` and `epoch_query`.
#[allow(dead_code)]
pub fn get_attestation_by_pubkey(
conn: &mut PgConn,
pubkey_query: WatchPK,
epoch_query: Epoch,
slots_per_epoch: u64,
) -> Result<Option<WatchSuboptimalAttestation>, Error> {
use self::suboptimal_attestations::dsl::*;
use self::validators::dsl::{public_key, validators};
let timer = Instant::now();
let join = validators.inner_join(suboptimal_attestations);
let result = join
.select((epoch_start_slot, index, source, head, target))
.filter(epoch_start_slot.eq(WatchSlot::from_slot(
epoch_query.start_slot(slots_per_epoch),
)))
.filter(public_key.eq(pubkey_query))
.first::<WatchSuboptimalAttestation>(conn)
.optional()?;
let time_taken = timer.elapsed();
debug!("Attestation requested for validator: {pubkey_query}, epoch: {epoch_query}, time taken: {time_taken:?}");
Ok(result)
}
/// Selects `index` for all validators in the suboptimal_attestations table
/// that have `source == false` for the corresponding `epoch_start_slot_query`.
pub fn get_validators_missed_source(
conn: &mut PgConn,
epoch_start_slot_query: WatchSlot,
) -> Result<Vec<i32>, Error> {
use self::suboptimal_attestations::dsl::*;
Ok(suboptimal_attestations
.select(index)
.filter(epoch_start_slot.eq(epoch_start_slot_query))
.filter(source.eq(false))
.load::<i32>(conn)?)
}
/// Selects `index` for all validators in the suboptimal_attestations table
/// that have `head == false` for the corresponding `epoch_start_slot_query`.
pub fn get_validators_missed_head(
conn: &mut PgConn,
epoch_start_slot_query: WatchSlot,
) -> Result<Vec<i32>, Error> {
use self::suboptimal_attestations::dsl::*;
Ok(suboptimal_attestations
.select(index)
.filter(epoch_start_slot.eq(epoch_start_slot_query))
.filter(head.eq(false))
.load::<i32>(conn)?)
}
/// Selects `index` for all validators in the suboptimal_attestations table
/// that have `target == false` for the corresponding `epoch_start_slot_query`.
pub fn get_validators_missed_target(
conn: &mut PgConn,
epoch_start_slot_query: WatchSlot,
) -> Result<Vec<i32>, Error> {
use self::suboptimal_attestations::dsl::*;
Ok(suboptimal_attestations
.select(index)
.filter(epoch_start_slot.eq(epoch_start_slot_query))
.filter(target.eq(false))
.load::<i32>(conn)?)
}
/// Selects all rows from the `suboptimal_attestations` table for the given
/// `epoch_start_slot_query`.
pub fn get_all_suboptimal_attestations_for_epoch(
conn: &mut PgConn,
epoch_start_slot_query: WatchSlot,
) -> Result<Vec<WatchSuboptimalAttestation>, Error> {
use self::suboptimal_attestations::dsl::*;
Ok(suboptimal_attestations
.filter(epoch_start_slot.eq(epoch_start_slot_query))
.load::<WatchSuboptimalAttestation>(conn)?)
}

View File

@@ -0,0 +1,56 @@
pub mod database;
pub mod server;
pub mod updater;
use crate::database::watch_types::WatchSlot;
use crate::updater::error::Error;
pub use database::{
get_all_suboptimal_attestations_for_epoch, get_attestation_by_index, get_attestation_by_pubkey,
get_highest_attestation, get_lowest_attestation, insert_batch_suboptimal_attestations,
WatchAttestation, WatchSuboptimalAttestation,
};
pub use server::{attestation_routes, blockprint_attestation_routes};
use eth2::BeaconNodeHttpClient;
use types::Epoch;
/// Sends a request to `lighthouse/analysis/attestation_performance`.
/// Formats the response into a vector of `WatchSuboptimalAttestation`.
///
/// Any attestations with `source == true && head == true && target == true` are ignored.
pub async fn get_attestation_performances(
bn: &BeaconNodeHttpClient,
start_epoch: Epoch,
end_epoch: Epoch,
slots_per_epoch: u64,
) -> Result<Vec<WatchSuboptimalAttestation>, Error> {
let mut output = Vec::new();
let result = bn
.get_lighthouse_analysis_attestation_performance(
start_epoch,
end_epoch,
"global".to_string(),
)
.await?;
for index in result {
for epoch in index.epochs {
if epoch.1.active {
// Check if the attestation is suboptimal.
if !epoch.1.source || !epoch.1.head || !epoch.1.target {
output.push(WatchSuboptimalAttestation {
epoch_start_slot: WatchSlot::from_slot(
Epoch::new(epoch.0).start_slot(slots_per_epoch),
),
index: index.index as i32,
source: epoch.1.source,
head: epoch.1.head,
target: epoch.1.target,
})
}
}
}
}
Ok(output)
}

View File

@@ -0,0 +1,299 @@
use crate::database::{
get_canonical_slot, get_connection, get_validator_by_index, get_validator_by_public_key,
get_validators_clients_at_slot, get_validators_latest_proposer_info, PgPool, WatchPK,
WatchSlot,
};
use crate::blockprint::database::construct_validator_blockprints_at_slot;
use crate::server::Error;
use crate::suboptimal_attestations::database::{
get_all_suboptimal_attestations_for_epoch, get_attestation_by_index,
get_validators_missed_head, get_validators_missed_source, get_validators_missed_target,
WatchAttestation, WatchSuboptimalAttestation,
};
use axum::{extract::Path, routing::get, Extension, Json, Router};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use types::Epoch;
// Will return Ok(None) if the epoch is not synced or if the validator does not exist.
// In the future it might be worth differentiating these events.
pub async fn get_validator_attestation(
Path((validator_query, epoch_query)): Path<(String, u64)>,
Extension(pool): Extension<PgPool>,
Extension(slots_per_epoch): Extension<u64>,
) -> Result<Json<Option<WatchAttestation>>, Error> {
let mut conn = get_connection(&pool).map_err(Error::Database)?;
let epoch = Epoch::new(epoch_query);
// Ensure the database has synced the target epoch.
if get_canonical_slot(
&mut conn,
WatchSlot::from_slot(epoch.end_slot(slots_per_epoch)),
)?
.is_none()
{
// Epoch is not fully synced.
return Ok(Json(None));
}
let index = if validator_query.starts_with("0x") {
let pubkey = WatchPK::from_str(&validator_query).map_err(|_| Error::BadRequest)?;
get_validator_by_public_key(&mut conn, pubkey)?
.ok_or(Error::NotFound)?
.index
} else {
i32::from_str(&validator_query).map_err(|_| Error::BadRequest)?
};
let attestation = if let Some(suboptimal_attestation) =
get_attestation_by_index(&mut conn, index, epoch, slots_per_epoch)?
{
Some(suboptimal_attestation.to_attestation(slots_per_epoch))
} else {
// Attestation was not in database. Check if the validator was active.
match get_validator_by_index(&mut conn, index)? {
Some(validator) => {
if let Some(activation_epoch) = validator.activation_epoch {
if activation_epoch <= epoch.as_u64() as i32 {
if let Some(exit_epoch) = validator.exit_epoch {
if exit_epoch > epoch.as_u64() as i32 {
// Validator is active and has not yet exited.
Some(WatchAttestation::optimal(index, epoch))
} else {
// Validator has exited.
None
}
} else {
// Validator is active and has not yet exited.
Some(WatchAttestation::optimal(index, epoch))
}
} else {
// Validator is not yet active.
None
}
} else {
// Validator is not yet active.
None
}
}
None => return Err(Error::Other("Validator index does not exist".to_string())),
}
};
Ok(Json(attestation))
}
pub async fn get_all_validators_attestations(
Path(epoch): Path<u64>,
Extension(pool): Extension<PgPool>,
Extension(slots_per_epoch): Extension<u64>,
) -> Result<Json<Vec<WatchSuboptimalAttestation>>, Error> {
let mut conn = get_connection(&pool).map_err(Error::Database)?;
let epoch_start_slot = WatchSlot::from_slot(Epoch::new(epoch).start_slot(slots_per_epoch));
Ok(Json(get_all_suboptimal_attestations_for_epoch(
&mut conn,
epoch_start_slot,
)?))
}
pub async fn get_validators_missed_vote(
Path((vote, epoch)): Path<(String, u64)>,
Extension(pool): Extension<PgPool>,
Extension(slots_per_epoch): Extension<u64>,
) -> Result<Json<Vec<i32>>, Error> {
let mut conn = get_connection(&pool).map_err(Error::Database)?;
let epoch_start_slot = WatchSlot::from_slot(Epoch::new(epoch).start_slot(slots_per_epoch));
match vote.to_lowercase().as_str() {
"source" => Ok(Json(get_validators_missed_source(
&mut conn,
epoch_start_slot,
)?)),
"head" => Ok(Json(get_validators_missed_head(
&mut conn,
epoch_start_slot,
)?)),
"target" => Ok(Json(get_validators_missed_target(
&mut conn,
epoch_start_slot,
)?)),
_ => Err(Error::BadRequest),
}
}
pub async fn get_validators_missed_vote_graffiti(
Path((vote, epoch)): Path<(String, u64)>,
Extension(pool): Extension<PgPool>,
Extension(slots_per_epoch): Extension<u64>,
) -> Result<Json<HashMap<String, u64>>, Error> {
let mut conn = get_connection(&pool).map_err(Error::Database)?;
let Json(indices) = get_validators_missed_vote(
Path((vote, epoch)),
Extension(pool),
Extension(slots_per_epoch),
)
.await?;
let graffitis = get_validators_latest_proposer_info(&mut conn, indices)?
.values()
.map(|info| info.graffiti.clone())
.collect::<Vec<String>>();
let mut result = HashMap::new();
for graffiti in graffitis {
if !result.contains_key(&graffiti) {
result.insert(graffiti.clone(), 0);
}
*result
.get_mut(&graffiti)
.ok_or_else(|| Error::Other("An unexpected error occurred".to_string()))? += 1;
}
Ok(Json(result))
}
pub fn attestation_routes() -> Router {
Router::new()
.route(
"/v1/validators/:validator/attestation/:epoch",
get(get_validator_attestation),
)
.route(
"/v1/validators/all/attestation/:epoch",
get(get_all_validators_attestations),
)
.route(
"/v1/validators/missed/:vote/:epoch",
get(get_validators_missed_vote),
)
.route(
"/v1/validators/missed/:vote/:epoch/graffiti",
get(get_validators_missed_vote_graffiti),
)
}
/// The functions below are dependent on Blockprint and if it is disabled, the endpoints will be
/// disabled.
pub async fn get_clients_missed_vote(
Path((vote, epoch)): Path<(String, u64)>,
Extension(pool): Extension<PgPool>,
Extension(slots_per_epoch): Extension<u64>,
) -> Result<Json<HashMap<String, u64>>, Error> {
let mut conn = get_connection(&pool).map_err(Error::Database)?;
let Json(indices) = get_validators_missed_vote(
Path((vote, epoch)),
Extension(pool),
Extension(slots_per_epoch),
)
.await?;
// All validators which missed the vote.
let indices_map = indices.into_iter().collect::<HashSet<i32>>();
let target_slot = WatchSlot::from_slot(Epoch::new(epoch).start_slot(slots_per_epoch));
// All validators.
let client_map =
construct_validator_blockprints_at_slot(&mut conn, target_slot, slots_per_epoch)?;
let mut result = HashMap::new();
for index in indices_map {
if let Some(print) = client_map.get(&index) {
if !result.contains_key(print) {
result.insert(print.clone(), 0);
}
*result
.get_mut(print)
.ok_or_else(|| Error::Other("An unexpected error occurred".to_string()))? += 1;
}
}
Ok(Json(result))
}
pub async fn get_clients_missed_vote_percentages(
Path((vote, epoch)): Path<(String, u64)>,
Extension(pool): Extension<PgPool>,
Extension(slots_per_epoch): Extension<u64>,
) -> Result<Json<HashMap<String, f64>>, Error> {
let Json(clients_counts) = get_clients_missed_vote(
Path((vote, epoch)),
Extension(pool.clone()),
Extension(slots_per_epoch),
)
.await?;
let target_slot = WatchSlot::from_slot(Epoch::new(epoch).start_slot(slots_per_epoch));
let mut conn = get_connection(&pool)?;
let totals = get_validators_clients_at_slot(&mut conn, target_slot, slots_per_epoch)?;
let mut result = HashMap::new();
for (client, count) in clients_counts.iter() {
let client_total: f64 = *totals
.get(client)
.ok_or_else(|| Error::Other("Client type mismatch".to_string()))?
as f64;
// `client_total` should never be `0`, but if it is, return `0` instead of `inf`.
if client_total == 0.0 {
result.insert(client.to_string(), 0.0);
} else {
let percentage: f64 = *count as f64 / client_total * 100.0;
result.insert(client.to_string(), percentage);
}
}
Ok(Json(result))
}
pub async fn get_clients_missed_vote_percentages_relative(
Path((vote, epoch)): Path<(String, u64)>,
Extension(pool): Extension<PgPool>,
Extension(slots_per_epoch): Extension<u64>,
) -> Result<Json<HashMap<String, f64>>, Error> {
let Json(clients_counts) = get_clients_missed_vote(
Path((vote, epoch)),
Extension(pool),
Extension(slots_per_epoch),
)
.await?;
let mut total: u64 = 0;
for (_, count) in clients_counts.iter() {
total += *count
}
let mut result = HashMap::new();
for (client, count) in clients_counts.iter() {
// `total` should never be 0, but if it is, return `-` instead of `inf`.
if total == 0 {
result.insert(client.to_string(), 0.0);
} else {
let percentage: f64 = *count as f64 / total as f64 * 100.0;
result.insert(client.to_string(), percentage);
}
}
Ok(Json(result))
}
pub fn blockprint_attestation_routes() -> Router {
Router::new()
.route(
"/v1/clients/missed/:vote/:epoch",
get(get_clients_missed_vote),
)
.route(
"/v1/clients/missed/:vote/:epoch/percentages",
get(get_clients_missed_vote_percentages),
)
.route(
"/v1/clients/missed/:vote/:epoch/percentages/relative",
get(get_clients_missed_vote_percentages_relative),
)
}

View File

@@ -0,0 +1,236 @@
use crate::database::{self, Error as DbError};
use crate::updater::{Error, UpdateHandler};
use crate::suboptimal_attestations::get_attestation_performances;
use eth2::types::EthSpec;
use log::{debug, error, warn};
const MAX_SIZE_SINGLE_REQUEST_ATTESTATIONS: u64 = 50;
impl<T: EthSpec> UpdateHandler<T> {
/// Forward fills the `suboptimal_attestations` table starting from the entry with the highest
/// slot.
///
/// It construts a request to the `attestation_performance` API endpoint with:
/// `start_epoch` -> highest completely filled epoch + 1 (or epoch of lowest canonical slot)
/// `end_epoch` -> epoch of highest canonical slot
///
/// It will resync the latest epoch if it is not fully filled but will not overwrite existing
/// values unless there is a re-org.
/// That is, `if highest_filled_slot % slots_per_epoch != 31`.
///
/// In the event the most recent epoch has no suboptimal attestations, it will attempt to
/// resync that epoch. The odds of this occuring on mainnet are vanishingly small so it is not
/// accounted for.
///
/// Request range will not exceed `MAX_SIZE_SINGLE_REQUEST_ATTESTATIONS`.
pub async fn fill_suboptimal_attestations(&mut self) -> Result<(), Error> {
let mut conn = database::get_connection(&self.pool)?;
let highest_filled_slot_opt = if self.config.attestations {
database::get_highest_attestation(&mut conn)?
.map(|attestation| attestation.epoch_start_slot.as_slot())
} else {
return Err(Error::NotEnabled("attestations".to_string()));
};
let start_epoch = if let Some(highest_filled_slot) = highest_filled_slot_opt {
if highest_filled_slot % self.slots_per_epoch == self.slots_per_epoch.saturating_sub(1)
{
// The whole epoch is filled so we can begin syncing the next one.
highest_filled_slot.epoch(self.slots_per_epoch) + 1
} else {
// The epoch is only partially synced. Try to sync it fully.
highest_filled_slot.epoch(self.slots_per_epoch)
}
} else {
// No rows present in the `suboptimal_attestations` table. Use `canonical_slots`
// instead.
if let Some(lowest_canonical_slot) = database::get_lowest_canonical_slot(&mut conn)? {
lowest_canonical_slot
.slot
.as_slot()
.epoch(self.slots_per_epoch)
} else {
// There are no slots in the database, do not fill the `suboptimal_attestations`
// table.
warn!("Refusing to fill the `suboptimal_attestations` table as there are no slots in the database");
return Ok(());
}
};
if let Some(highest_canonical_slot) =
database::get_highest_canonical_slot(&mut conn)?.map(|slot| slot.slot.as_slot())
{
let mut end_epoch = highest_canonical_slot.epoch(self.slots_per_epoch);
// The `lighthouse/analysis/attestation_performance` endpoint can only retrieve attestations
// which are more than 1 epoch old.
// We assume that `highest_canonical_slot` is near the head of the chain.
end_epoch = end_epoch.saturating_sub(2_u64);
// If end_epoch == 0 then the chain just started so we need to wait until
// `current_epoch >= 2`.
if end_epoch == 0 {
debug!("Chain just begun, refusing to sync attestations");
return Ok(());
}
if start_epoch > end_epoch {
debug!("Attestations are up to date with the head of the database");
return Ok(());
}
// Ensure the size of the request does not exceed the maximum allowed value.
if start_epoch < end_epoch.saturating_sub(MAX_SIZE_SINGLE_REQUEST_ATTESTATIONS) {
end_epoch = start_epoch + MAX_SIZE_SINGLE_REQUEST_ATTESTATIONS
}
if let Some(lowest_canonical_slot) =
database::get_lowest_canonical_slot(&mut conn)?.map(|slot| slot.slot.as_slot())
{
let mut attestations = get_attestation_performances(
&self.bn,
start_epoch,
end_epoch,
self.slots_per_epoch,
)
.await?;
// Only insert attestations with corresponding `canonical_slot`s.
attestations.retain(|attestation| {
attestation.epoch_start_slot.as_slot() >= lowest_canonical_slot
&& attestation.epoch_start_slot.as_slot() <= highest_canonical_slot
});
database::insert_batch_suboptimal_attestations(&mut conn, attestations)?;
} else {
return Err(Error::Database(DbError::Other(
"Database did not return a lowest canonical slot when one exists".to_string(),
)));
}
} else {
// There are no slots in the `canonical_slots` table, but there are entries in the
// `suboptimal_attestations` table. This is a critical failure. It usually means
// someone has manually tampered with the database tables and should not occur during
// normal operation.
error!("Database is corrupted. Please re-sync the database");
return Err(Error::Database(DbError::DatabaseCorrupted));
}
Ok(())
}
/// Backfill the `suboptimal_attestations` table starting from the entry with the lowest slot.
///
/// It constructs a request to the `attestation_performance` API endpoint with:
/// `start_epoch` -> epoch of the lowest `canonical_slot`.
/// `end_epoch` -> epoch of the lowest filled `suboptimal_attestation` - 1 (or epoch of highest
/// canonical slot)
///
/// It will resync the lowest epoch if it is not fully filled.
/// That is, `if lowest_filled_slot % slots_per_epoch != 0`
///
/// In the event there are no suboptimal attestations present in the lowest epoch, it will attempt to
/// resync the epoch. The odds of this occuring on mainnet are vanishingly small so it is not
/// accounted for.
///
/// Request range will not exceed `MAX_SIZE_SINGLE_REQUEST_ATTESTATIONS`.
pub async fn backfill_suboptimal_attestations(&mut self) -> Result<(), Error> {
let mut conn = database::get_connection(&self.pool)?;
let max_attestation_backfill = self.config.max_backfill_size_epochs;
// Get the slot of the lowest entry in the `suboptimal_attestations` table.
let lowest_filled_slot_opt = if self.config.attestations {
database::get_lowest_attestation(&mut conn)?
.map(|attestation| attestation.epoch_start_slot.as_slot())
} else {
return Err(Error::NotEnabled("attestations".to_string()));
};
let end_epoch = if let Some(lowest_filled_slot) = lowest_filled_slot_opt {
if lowest_filled_slot % self.slots_per_epoch == 0 {
lowest_filled_slot
.epoch(self.slots_per_epoch)
.saturating_sub(1_u64)
} else {
// The epoch is only partially synced. Try to sync it fully.
lowest_filled_slot.epoch(self.slots_per_epoch)
}
} else {
// No entries in the `suboptimal_attestations` table. Use `canonical_slots` instead.
if let Some(highest_canonical_slot) =
database::get_highest_canonical_slot(&mut conn)?.map(|slot| slot.slot.as_slot())
{
// Subtract 2 since `end_epoch` must be less than the current epoch - 1.
// We assume that `highest_canonical_slot` is near the head of the chain.
highest_canonical_slot
.epoch(self.slots_per_epoch)
.saturating_sub(2_u64)
} else {
// There are no slots in the database, do not backfill the
// `suboptimal_attestations` table.
warn!("Refusing to backfill attestations as there are no slots in the database");
return Ok(());
}
};
if end_epoch == 0 {
debug!("Attestations backfill is complete");
return Ok(());
}
if let Some(lowest_canonical_slot) =
database::get_lowest_canonical_slot(&mut conn)?.map(|slot| slot.slot.as_slot())
{
let mut start_epoch = lowest_canonical_slot.epoch(self.slots_per_epoch);
if start_epoch > end_epoch {
debug!("Attestations are up to date with the base of the database");
return Ok(());
}
// Ensure the request range does not exceed `max_attestation_backfill` or
// `MAX_SIZE_SINGLE_REQUEST_ATTESTATIONS`.
if start_epoch < end_epoch.saturating_sub(max_attestation_backfill) {
start_epoch = end_epoch.saturating_sub(max_attestation_backfill)
}
if start_epoch < end_epoch.saturating_sub(MAX_SIZE_SINGLE_REQUEST_ATTESTATIONS) {
start_epoch = end_epoch.saturating_sub(MAX_SIZE_SINGLE_REQUEST_ATTESTATIONS)
}
if let Some(highest_canonical_slot) =
database::get_highest_canonical_slot(&mut conn)?.map(|slot| slot.slot.as_slot())
{
let mut attestations = get_attestation_performances(
&self.bn,
start_epoch,
end_epoch,
self.slots_per_epoch,
)
.await?;
// Only insert `suboptimal_attestations` with corresponding `canonical_slots`.
attestations.retain(|attestation| {
attestation.epoch_start_slot.as_slot() >= lowest_canonical_slot
&& attestation.epoch_start_slot.as_slot() <= highest_canonical_slot
});
database::insert_batch_suboptimal_attestations(&mut conn, attestations)?;
} else {
return Err(Error::Database(DbError::Other(
"Database did not return a lowest slot when one exists".to_string(),
)));
}
} else {
// There are no slots in the `canonical_slot` table, but there are entries in the
// `suboptimal_attestations` table. This is a critical failure. It usually means
// someone has manually tampered with the database tables and should not occur during
// normal operation.
error!("Database is corrupted. Please re-sync the database");
return Err(Error::Database(DbError::DatabaseCorrupted));
}
Ok(())
}
}