mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-05 05:44:30 +00:00
Use data column batch verification consistently (#6851)
Resolve a `TODO(das)` to use KZG batch verification in `put_rpc_custody_columns` Uses `verify_kzg_for_data_column_list_with_scoring` in all paths that send more than one column. To use batch verification and have attributability of which peer is sending a bad column. Needs to move `verify_kzg_for_data_column_list_with_scoring` into the type's module to convert to the KZG verified type.
This commit is contained in:
@@ -27,8 +27,8 @@ mod overflow_lru_cache;
|
|||||||
mod state_lru_cache;
|
mod state_lru_cache;
|
||||||
|
|
||||||
use crate::data_column_verification::{
|
use crate::data_column_verification::{
|
||||||
verify_kzg_for_data_column, verify_kzg_for_data_column_list, CustodyDataColumn,
|
verify_kzg_for_data_column_list_with_scoring, CustodyDataColumn, GossipVerifiedDataColumn,
|
||||||
GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn,
|
KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn,
|
||||||
};
|
};
|
||||||
use crate::metrics::{
|
use crate::metrics::{
|
||||||
KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES,
|
KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES,
|
||||||
@@ -230,19 +230,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
|||||||
block_root: Hash256,
|
block_root: Hash256,
|
||||||
custody_columns: DataColumnSidecarList<T::EthSpec>,
|
custody_columns: DataColumnSidecarList<T::EthSpec>,
|
||||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||||
// TODO(das): report which column is invalid for proper peer scoring
|
// Attributes fault to the specific peer that sent an invalid column
|
||||||
// TODO(das): batch KZG verification here, but fallback into checking each column
|
let kzg_verified_columns = KzgVerifiedDataColumn::from_batch(custody_columns, &self.kzg)
|
||||||
// individually to report which column(s) are invalid.
|
.map_err(AvailabilityCheckError::InvalidColumn)?;
|
||||||
let verified_custody_columns = custody_columns
|
|
||||||
|
let verified_custody_columns = kzg_verified_columns
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|column| {
|
.map(KzgVerifiedCustodyDataColumn::from_asserted_custody)
|
||||||
let index = column.index;
|
.collect::<Vec<_>>();
|
||||||
Ok(KzgVerifiedCustodyDataColumn::from_asserted_custody(
|
|
||||||
KzgVerifiedDataColumn::new(column, &self.kzg)
|
|
||||||
.map_err(|e| AvailabilityCheckError::InvalidColumn(index, e))?,
|
|
||||||
))
|
|
||||||
})
|
|
||||||
.collect::<Result<Vec<_>, AvailabilityCheckError>>()?;
|
|
||||||
|
|
||||||
self.availability_cache.put_kzg_verified_data_columns(
|
self.availability_cache.put_kzg_verified_data_columns(
|
||||||
block_root,
|
block_root,
|
||||||
@@ -365,7 +360,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|custody_column| custody_column.as_data_column()),
|
.map(|custody_column| custody_column.as_data_column()),
|
||||||
&self.kzg,
|
&self.kzg,
|
||||||
)?;
|
)
|
||||||
|
.map_err(AvailabilityCheckError::InvalidColumn)?;
|
||||||
Ok(MaybeAvailableBlock::Available(AvailableBlock {
|
Ok(MaybeAvailableBlock::Available(AvailableBlock {
|
||||||
block_root,
|
block_root,
|
||||||
block,
|
block,
|
||||||
@@ -432,8 +428,9 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
|||||||
|
|
||||||
// verify kzg for all data columns at once
|
// verify kzg for all data columns at once
|
||||||
if !all_data_columns.is_empty() {
|
if !all_data_columns.is_empty() {
|
||||||
// TODO: Need to also attribute which specific block is faulty
|
// Attributes fault to the specific peer that sent an invalid column
|
||||||
verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.kzg)?;
|
verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.kzg)
|
||||||
|
.map_err(AvailabilityCheckError::InvalidColumn)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
for block in blocks {
|
for block in blocks {
|
||||||
@@ -716,32 +713,6 @@ async fn availability_cache_maintenance_service<T: BeaconChainTypes>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_kzg_for_data_column_list_with_scoring<'a, E: EthSpec, I>(
|
|
||||||
data_column_iter: I,
|
|
||||||
kzg: &'a Kzg,
|
|
||||||
) -> Result<(), AvailabilityCheckError>
|
|
||||||
where
|
|
||||||
I: Iterator<Item = &'a Arc<DataColumnSidecar<E>>> + Clone,
|
|
||||||
{
|
|
||||||
let Err(batch_err) = verify_kzg_for_data_column_list(data_column_iter.clone(), kzg) else {
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
|
|
||||||
let data_columns = data_column_iter.collect::<Vec<_>>();
|
|
||||||
// Find which column is invalid. If len is 1 or 0 continue to default case below.
|
|
||||||
// If len > 1 at least one column MUST fail.
|
|
||||||
if data_columns.len() > 1 {
|
|
||||||
for data_column in data_columns {
|
|
||||||
if let Err(e) = verify_kzg_for_data_column(data_column.clone(), kzg) {
|
|
||||||
return Err(AvailabilityCheckError::InvalidColumn(data_column.index, e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// len 0 should never happen
|
|
||||||
Err(AvailabilityCheckError::InvalidColumn(0, batch_err))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A fully available block that is ready to be imported into fork choice.
|
/// A fully available block that is ready to be imported into fork choice.
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct AvailableBlock<E: EthSpec> {
|
pub struct AvailableBlock<E: EthSpec> {
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use types::{BeaconStateError, ColumnIndex, Hash256};
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
InvalidBlobs(KzgError),
|
InvalidBlobs(KzgError),
|
||||||
InvalidColumn(ColumnIndex, KzgError),
|
InvalidColumn(Vec<(ColumnIndex, KzgError)>),
|
||||||
ReconstructColumnsError(KzgError),
|
ReconstructColumnsError(KzgError),
|
||||||
KzgCommitmentMismatch {
|
KzgCommitmentMismatch {
|
||||||
blob_commitment: KzgCommitment,
|
blob_commitment: KzgCommitment,
|
||||||
|
|||||||
@@ -239,6 +239,18 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
|
|||||||
pub fn new(data_column: Arc<DataColumnSidecar<E>>, kzg: &Kzg) -> Result<Self, KzgError> {
|
pub fn new(data_column: Arc<DataColumnSidecar<E>>, kzg: &Kzg) -> Result<Self, KzgError> {
|
||||||
verify_kzg_for_data_column(data_column, kzg)
|
verify_kzg_for_data_column(data_column, kzg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn from_batch(
|
||||||
|
data_columns: Vec<Arc<DataColumnSidecar<E>>>,
|
||||||
|
kzg: &Kzg,
|
||||||
|
) -> Result<Vec<Self>, Vec<(ColumnIndex, KzgError)>> {
|
||||||
|
verify_kzg_for_data_column_list_with_scoring(data_columns.iter(), kzg)?;
|
||||||
|
Ok(data_columns
|
||||||
|
.into_iter()
|
||||||
|
.map(|column| Self { data: column })
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn to_data_column(self) -> Arc<DataColumnSidecar<E>> {
|
pub fn to_data_column(self) -> Arc<DataColumnSidecar<E>> {
|
||||||
self.data
|
self.data
|
||||||
}
|
}
|
||||||
@@ -378,6 +390,38 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Complete kzg verification for a list of `DataColumnSidecar`s.
|
||||||
|
///
|
||||||
|
/// If there's at least one invalid column, it re-verifies all columns individually to identify the
|
||||||
|
/// first column that is invalid. This is necessary to attribute fault to the specific peer that
|
||||||
|
/// sent bad data. The re-verification cost should not be significant. If a peer sends invalid data it
|
||||||
|
/// will be quickly banned.
|
||||||
|
pub fn verify_kzg_for_data_column_list_with_scoring<'a, E: EthSpec, I>(
|
||||||
|
data_column_iter: I,
|
||||||
|
kzg: &'a Kzg,
|
||||||
|
) -> Result<(), Vec<(ColumnIndex, KzgError)>>
|
||||||
|
where
|
||||||
|
I: Iterator<Item = &'a Arc<DataColumnSidecar<E>>> + Clone,
|
||||||
|
{
|
||||||
|
if verify_kzg_for_data_column_list(data_column_iter.clone(), kzg).is_ok() {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
// Find all columns that are invalid and identify by index. If we hit this condition there
|
||||||
|
// should be at least one invalid column
|
||||||
|
let errors = data_column_iter
|
||||||
|
.filter_map(|data_column| {
|
||||||
|
if let Err(e) = verify_kzg_for_data_column(data_column.clone(), kzg) {
|
||||||
|
Some((data_column.index, e))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
Err(errors)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes, O: ObservationStrategy>(
|
pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes, O: ObservationStrategy>(
|
||||||
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
|
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
|
||||||
subnet: u64,
|
subnet: u64,
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ use beacon_chain::data_availability_checker::{
|
|||||||
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
|
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
|
||||||
pub use common::RequestState;
|
pub use common::RequestState;
|
||||||
use fnv::FnvHashMap;
|
use fnv::FnvHashMap;
|
||||||
|
use itertools::Itertools;
|
||||||
use lighthouse_network::service::api_types::SingleLookupReqId;
|
use lighthouse_network::service::api_types::SingleLookupReqId;
|
||||||
use lighthouse_network::{PeerAction, PeerId};
|
use lighthouse_network::{PeerAction, PeerId};
|
||||||
use lru_cache::LRUTimeCache;
|
use lru_cache::LRUTimeCache;
|
||||||
@@ -644,8 +645,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
// but future errors may follow the same pattern. Generalize this
|
// but future errors may follow the same pattern. Generalize this
|
||||||
// pattern with https://github.com/sigp/lighthouse/pull/6321
|
// pattern with https://github.com/sigp/lighthouse/pull/6321
|
||||||
BlockError::AvailabilityCheck(
|
BlockError::AvailabilityCheck(
|
||||||
AvailabilityCheckError::InvalidColumn(index, _),
|
AvailabilityCheckError::InvalidColumn(errors),
|
||||||
) => peer_group.of_index(index as usize).collect(),
|
) => errors
|
||||||
|
.iter()
|
||||||
|
// Collect all peers that sent a column that was invalid. Must
|
||||||
|
// run .unique as a single peer can send multiple invalid
|
||||||
|
// columns. Penalize once to avoid insta-bans
|
||||||
|
.flat_map(|(index, _)| peer_group.of_index((*index) as usize))
|
||||||
|
.unique()
|
||||||
|
.collect(),
|
||||||
_ => peer_group.all().collect(),
|
_ => peer_group.all().collect(),
|
||||||
};
|
};
|
||||||
for peer in peers_to_penalize {
|
for peer in peers_to_penalize {
|
||||||
|
|||||||
Reference in New Issue
Block a user