Files
lighthouse/beacon_node/http_api/src/publish_attestations.rs
ethDreamer c52c598f69 Electra: Remaining Consensus Data Structures (#5712)
* Attestation superstruct changes for EIP 7549 (#5644)

* update

* experiment

* superstruct changes

* revert

* superstruct changes

* fix tests

* indexed attestation

* indexed attestation superstruct

* updated TODOs

* `superstruct` the `AttesterSlashing` (#5636)

* `superstruct` Attester Fork Variants

* Push a little further

* Deal with Encode / Decode of AttesterSlashing

* not so sure about this..

* Stop Encode/Decode Bounds from Propagating Out

* Tons of Changes..

* More Conversions to AttestationRef

* Add AsReference trait (#15)

* Add AsReference trait

* Fix some snafus

* Got it Compiling! :D

* Got Tests Building

* Get beacon chain tests compiling

---------

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Merge remote-tracking branch 'upstream/unstable' into electra_attestation_changes

* Make EF Tests Fork-Agnostic (#5713)

* Finish EF Test Fork Agnostic (#5714)

* Superstruct `AggregateAndProof` (#5715)

* Upgrade `superstruct` to `0.8.0`

* superstruct `AggregateAndProof`

* Merge remote-tracking branch 'sigp/unstable' into electra_attestation_changes

* cargo fmt

* Merge pull request #5726 from realbigsean/electra_attestation_changes

Merge unstable into Electra attestation changes

* EIP7549 `get_attestation_indices` (#5657)

* get attesting indices electra impl

* fmt

* get tests to pass

* fmt

* fix some beacon chain tests

* fmt

* fix slasher test

* fmt got me again

* fix more tests

* fix tests

* Some small changes (#5739)

* cargo fmt (#5740)

* Sketch op pool changes

* fix get attesting indices (#5742)

* fix get attesting indices

* better errors

* fix compile

* only get committee index once

* Ef test fixes (#5753)

* attestation related ef test fixes

* delete commented out stuff

* Fix Aggregation Pool for Electra (#5754)

* Fix Aggregation Pool for Electra

* Remove Outdated Interface

* fix ssz (#5755)

* Get `electra_op_pool` up to date (#5756)

* fix get attesting indices (#5742)

* fix get attesting indices

* better errors

* fix compile

* only get committee index once

* Ef test fixes (#5753)

* attestation related ef test fixes

* delete commented out stuff

* Fix Aggregation Pool for Electra (#5754)

* Fix Aggregation Pool for Electra

* Remove Outdated Interface

* fix ssz (#5755)

---------

Co-authored-by: realbigsean <sean@sigmaprime.io>

* Revert "Get `electra_op_pool` up to date (#5756)" (#5757)

This reverts commit ab9e58aa3d.

* Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into electra_op_pool

* Compute on chain aggregate impl (#5752)

* add compute_on_chain_agg impl to op pool changes

* fmt

* get op pool tests to pass

* update the naive agg pool interface (#5760)

* Fix bugs in cross-committee aggregation

* Add comment to max cover optimisation

* Fix assert

* Merge pull request #5749 from sigp/electra_op_pool

Optimise Electra op pool aggregation

* update committee offset

* Fix Electra Fork Choice Tests (#5764)

* Subscribe to the correct subnets for electra attestations (#5782)

* subscribe to the correct att subnets for electra

* subscribe to the correct att subnets for electra

* cargo fmt

* fix slashing handling

* Merge remote-tracking branch 'upstream/unstable'

* Send unagg attestation based on fork

* Publish all aggregates

* just one more check bro plz..

* Merge pull request #5832 from ethDreamer/electra_attestation_changes_merge_unstable

Merge `unstable` into `electra_attestation_changes`

* Merge pull request #5835 from realbigsean/fix-validator-logic

Fix validator logic

* Merge pull request #5816 from realbigsean/electra-attestation-slashing-handling

Electra slashing handling

* Electra attestation changes rm decode impl (#5856)

* Remove Crappy Decode impl for Attestation

* Remove Inefficient Attestation Decode impl

* Implement Schema Upgrade / Downgrade

* Update beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

---------

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Fix failing attestation tests and misc electra attestation cleanup (#5810)

* - get attestation related beacon chain tests to pass
- observed attestations are now keyed off of data + committee index
- rename op pool attestationref to compactattestationref
- remove unwraps in agg pool and use options instead
- cherry pick some changes from ef-tests-electra

* cargo fmt

* fix failing test

* Revert dockerfile changes

* make committee_index return option

* function args shouldnt be a ref to attestation ref

* fmt

* fix dup imports

---------

Co-authored-by: realbigsean <seananderson33@GMAIL.com>

* fix some todos (#5817)

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes

* add consolidations to merkle calc for inclusion proof

* Remove Duplicate KZG Commitment Merkle Proof Code (#5874)

* Remove Duplicate KZG Commitment Merkle Proof Code

* s/tree_lists/fields/

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes

* fix compile

* Fix slasher tests (#5906)

* Fix electra tests

* Add electra attestations to double vote tests

* Update superstruct to 0.8

* Merge remote-tracking branch 'origin/unstable' into electra_attestation_changes

* Small cleanup in slasher tests

* Clean up Electra observed aggregates (#5929)

* Use consistent key in observed_attestations

* Remove unwraps from observed aggregates

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes

* De-dup attestation constructor logic

* Remove unwraps in Attestation construction

* Dedup match_attestation_data

* Remove outdated TODO

* Use ForkName Ord in fork-choice tests

* Use ForkName Ord in BeaconBlockBody

* Make to_electra not fallible

* Remove TestRandom impl for IndexedAttestation

* Remove IndexedAttestation faulty Decode impl

* Drop TestRandom impl

* Add PendingAttestationInElectra

* Indexed att on disk (#35)

* indexed att on disk

* fix lints

* Update slasher/src/migrate.rs

Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com>

---------

Co-authored-by: Lion - dapplion <35266934+dapplion@users.noreply.github.com>
Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com>

* add electra fork enabled fn to ForkName impl (#36)

* add electra fork enabled fn to ForkName impl

* remove inadvertent file

* Update common/eth2/src/types.rs

Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com>

* Dedup attestation constructor logic in attester cache

* Use if let Ok for committee_bits

* Dedup Attestation constructor code

* Diff reduction in tests

* Fix beacon_chain tests

* Diff reduction

* Use Ord for ForkName in pubsub

* Resolve into_attestation_and_indices todo

* Remove stale TODO

* Fix beacon_chain tests

* Test spec invariant

* Use electra_enabled in pubsub

* Remove get_indexed_attestation_from_signed_aggregate

* Use ok_or instead of if let else

* committees are sorted

* remove dup method `get_indexed_attestation_from_committees`

* Merge pull request #5940 from dapplion/electra_attestation_changes_lionreview

Electra attestations #5712 review

* update default persisted op pool deserialization

* ensure aggregate and proof uses serde untagged on ref

* Fork aware ssz static attestation tests

* Electra attestation changes from Lions review (#5971)

* dedup/cleanup and remove unneeded hashset use

* remove irrelevant TODOs

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes

* Electra attestation changes sean review (#5972)

* instantiate empty bitlist in unreachable code

* clean up error conversion

* fork enabled bool cleanup

* remove a couple todos

* return bools instead of options in `aggregate` and use the result

* delete commented out code

* use map macros in simple transformations

* remove signers_disjoint_from

* get ef tests compiling

* get ef tests compiling

* update intentionally excluded files

* Avoid changing slasher schema for Electra

* Delete slasher schema v4

* Fix clippy

* Fix compilation of beacon_chain tests

* Update database.rs

* Add electra lightclient types

* Update slasher/src/database.rs

* fix imports

* Merge pull request #5980 from dapplion/electra-lightclient

Add electra lightclient types

* Merge pull request #5975 from michaelsproul/electra-slasher-no-migration

Avoid changing slasher schema for Electra

* Update beacon_node/beacon_chain/src/attestation_verification.rs

* Update beacon_node/beacon_chain/src/attestation_verification.rs
2024-06-24 21:08:07 +00:00

320 lines
12 KiB
Rust

//! Import attestations and publish them to the network.
//!
//! This module gracefully handles attestations to unknown blocks by requeuing them and then
//! efficiently waiting for them to finish reprocessing (using an async yield).
//!
//! The following comments relate to the handling of duplicate attestations (relocated here during
//! refactoring):
//!
//! Skip to the next attestation since an attestation for this
//! validator is already known in this epoch.
//!
//! There's little value for the network in validating a second
//! attestation for another validator since it is either:
//!
//! 1. A duplicate.
//! 2. Slashable.
//! 3. Invalid.
//!
//! We are likely to get duplicates in the case where a VC is using
//! fallback BNs. If the first BN actually publishes some/all of a
//! batch of attestations but fails to respond in a timely fashion,
//! the VC is likely to try publishing the attestations on another
//! BN. That second BN may have already seen the attestations from
//! the first BN and therefore indicate that the attestations are
//! "already seen". An attestation that has already been seen has
//! been published on the network so there's no actual error from
//! the perspective of the user.
//!
//! It's better to prevent slashable attestations from ever
//! appearing on the network than trying to slash validators,
//! especially those validators connected to the local API.
//!
//! There might be *some* value in determining that this attestation
//! is invalid, but since a valid attestation already it exists it
//! appears that this validator is capable of producing valid
//! attestations and there's no immediate cause for concern.
use crate::task_spawner::{Priority, TaskSpawner};
use beacon_chain::{
validator_monitor::timestamp_now, AttestationError, BeaconChain, BeaconChainError,
BeaconChainTypes,
};
use beacon_processor::work_reprocessing_queue::{QueuedUnaggregate, ReprocessQueueMessage};
use eth2::types::Failure;
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use slog::{debug, error, warn, Logger};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{
mpsc::{Sender, UnboundedSender},
oneshot,
};
use types::Attestation;
// Error variants are only used in `Debug` and considered `dead_code` by the compiler.
#[derive(Debug)]
enum Error {
Validation(AttestationError),
Publication,
ForkChoice(#[allow(dead_code)] BeaconChainError),
AggregationPool(#[allow(dead_code)] AttestationError),
ReprocessDisabled,
ReprocessFull,
ReprocessTimeout,
}
enum PublishAttestationResult {
Success,
AlreadyKnown,
Reprocessing(oneshot::Receiver<Result<(), Error>>),
Failure(Error),
}
fn verify_and_publish_attestation<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
attestation: &Attestation<T::EthSpec>,
seen_timestamp: Duration,
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
log: &Logger,
) -> Result<(), Error> {
let attestation = chain
.verify_unaggregated_attestation_for_gossip(attestation, None)
.map_err(Error::Validation)?;
// Publish.
network_tx
.send(NetworkMessage::Publish {
messages: vec![PubsubMessage::Attestation(Box::new((
attestation.subnet_id(),
attestation.attestation().clone_as_attestation(),
)))],
})
.map_err(|_| Error::Publication)?;
// Notify the validator monitor.
chain
.validator_monitor
.read()
.register_api_unaggregated_attestation(
seen_timestamp,
attestation.indexed_attestation(),
&chain.slot_clock,
);
let fc_result = chain.apply_attestation_to_fork_choice(&attestation);
let naive_aggregation_result = chain.add_to_naive_aggregation_pool(&attestation);
if let Err(e) = &fc_result {
warn!(
log,
"Attestation invalid for fork choice";
"err" => ?e,
);
}
if let Err(e) = &naive_aggregation_result {
warn!(
log,
"Attestation invalid for aggregation";
"err" => ?e
);
}
if let Err(e) = fc_result {
Err(Error::ForkChoice(e))
} else if let Err(e) = naive_aggregation_result {
Err(Error::AggregationPool(e))
} else {
Ok(())
}
}
pub async fn publish_attestations<T: BeaconChainTypes>(
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<Attestation<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_send: Option<Sender<ReprocessQueueMessage>>,
log: Logger,
) -> Result<(), warp::Rejection> {
// Collect metadata about attestations which we'll use to report failures. We need to
// move the `attestations` vec into the blocking task, so this small overhead is unavoidable.
let attestation_metadata = attestations
.iter()
.map(|att| (att.data().slot, att.committee_index()))
.collect::<Vec<_>>();
// Gossip validate and publish attestations that can be immediately processed.
let seen_timestamp = timestamp_now();
let inner_log = log.clone();
let mut prelim_results = task_spawner
.blocking_task(Priority::P0, move || {
Ok(attestations
.into_iter()
.map(|attestation| {
match verify_and_publish_attestation(
&chain,
&attestation,
seen_timestamp,
&network_tx,
&inner_log,
) {
Ok(()) => PublishAttestationResult::Success,
Err(Error::Validation(AttestationError::UnknownHeadBlock {
beacon_block_root,
})) => {
let Some(reprocess_tx) = &reprocess_send else {
return PublishAttestationResult::Failure(Error::ReprocessDisabled);
};
// Re-process.
let (tx, rx) = oneshot::channel();
let reprocess_chain = chain.clone();
let reprocess_network_tx = network_tx.clone();
let reprocess_log = inner_log.clone();
let reprocess_fn = move || {
let result = verify_and_publish_attestation(
&reprocess_chain,
&attestation,
seen_timestamp,
&reprocess_network_tx,
&reprocess_log,
);
// Ignore failure on the oneshot that reports the result. This
// shouldn't happen unless some catastrophe befalls the waiting
// thread which causes it to drop.
let _ = tx.send(result);
};
let reprocess_msg =
ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate {
beacon_block_root,
process_fn: Box::new(reprocess_fn),
});
if reprocess_tx.try_send(reprocess_msg).is_err() {
PublishAttestationResult::Failure(Error::ReprocessFull)
} else {
PublishAttestationResult::Reprocessing(rx)
}
}
Err(Error::Validation(AttestationError::PriorAttestationKnown {
..
})) => PublishAttestationResult::AlreadyKnown,
Err(e) => PublishAttestationResult::Failure(e),
}
})
.map(Some)
.collect::<Vec<_>>())
})
.await?;
// Asynchronously wait for re-processing of attestations to unknown blocks. This avoids blocking
// any of the beacon processor workers while we wait for reprocessing.
let (reprocess_indices, reprocess_futures): (Vec<_>, Vec<_>) = prelim_results
.iter_mut()
.enumerate()
.filter_map(|(i, opt_result)| {
if let Some(PublishAttestationResult::Reprocessing(..)) = &opt_result {
let PublishAttestationResult::Reprocessing(rx) = opt_result.take()? else {
// Unreachable.
return None;
};
Some((i, rx))
} else {
None
}
})
.unzip();
let reprocess_results = futures::future::join_all(reprocess_futures).await;
// Join everything back together and construct a response.
// This part should be quick so we just stay in the Tokio executor's async task.
for (i, reprocess_result) in reprocess_indices.into_iter().zip(reprocess_results) {
let Some(result_entry) = prelim_results.get_mut(i) else {
error!(
log,
"Unreachable case in attestation publishing";
"case" => "prelim out of bounds",
"request_index" => i,
);
continue;
};
*result_entry = Some(match reprocess_result {
Ok(Ok(())) => PublishAttestationResult::Success,
// Attestation failed processing on re-process.
Ok(Err(Error::Validation(AttestationError::PriorAttestationKnown { .. }))) => {
PublishAttestationResult::AlreadyKnown
}
Ok(Err(e)) => PublishAttestationResult::Failure(e),
// Oneshot was dropped, indicating that the attestation either timed out in the
// reprocess queue or was dropped due to some error.
Err(_) => PublishAttestationResult::Failure(Error::ReprocessTimeout),
});
}
// Construct the response.
let mut failures = vec![];
let mut num_already_known = 0;
for (index, result) in prelim_results.iter().enumerate() {
match result {
Some(PublishAttestationResult::Success) => {}
Some(PublishAttestationResult::AlreadyKnown) => num_already_known += 1,
Some(PublishAttestationResult::Failure(e)) => {
if let Some((slot, committee_index)) = attestation_metadata.get(index) {
error!(
log,
"Failure verifying attestation for gossip";
"error" => ?e,
"request_index" => index,
"committee_index" => committee_index,
"attestation_slot" => slot,
);
failures.push(Failure::new(index, format!("{e:?}")));
} else {
error!(
log,
"Unreachable case in attestation publishing";
"case" => "out of bounds",
"request_index" => index
);
failures.push(Failure::new(index, "metadata logic error".into()));
}
}
Some(PublishAttestationResult::Reprocessing(_)) => {
error!(
log,
"Unreachable case in attestation publishing";
"case" => "reprocessing",
"request_index" => index
);
failures.push(Failure::new(index, "reprocess logic error".into()));
}
None => {
error!(
log,
"Unreachable case in attestation publishing";
"case" => "result is None",
"request_index" => index
);
failures.push(Failure::new(index, "result logic error".into()));
}
}
}
if num_already_known > 0 {
debug!(
log,
"Some unagg attestations already known";
"count" => num_already_known
);
}
if failures.is_empty() {
Ok(())
} else {
Err(warp_utils::reject::indexed_bad_request(
"error processing attestations".to_string(),
failures,
))
}
}