From e95f00342bc123c287ca8823e37b2c2b6aab15a1 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 7 Jan 2025 19:42:23 +0700 Subject: [PATCH 1/2] Cleanup single attestation imports --- beacon_node/beacon_chain/src/attestation_verification.rs | 2 +- beacon_node/beacon_chain/src/test_utils.rs | 1 - beacon_node/http_api/src/lib.rs | 2 +- beacon_node/http_api/src/publish_attestations.rs | 2 +- beacon_node/http_api/tests/tests.rs | 2 +- beacon_node/lighthouse_network/src/types/pubsub.rs | 2 +- beacon_node/network/src/network_beacon_processor/mod.rs | 1 - common/eth2/src/lib.rs | 2 +- common/eth2/src/types.rs | 1 - consensus/types/src/lib.rs | 2 +- consensus/types/src/subnet_id.rs | 2 +- 11 files changed, 8 insertions(+), 11 deletions(-) diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index cd284f557e..1a14cd2a69 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -60,7 +60,7 @@ use std::borrow::Cow; use strum::AsRefStr; use tree_hash::TreeHash; use types::{ - attestation::SingleAttestation, Attestation, AttestationRef, BeaconCommittee, + SingleAttestation, Attestation, AttestationRef, BeaconCommittee, BeaconStateError::NoCommitteeFound, ChainSpec, CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation, SelectionProof, SignedAggregateAndProof, Slot, SubnetId, }; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index de0e677528..dda8d6e3f9 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -16,7 +16,6 @@ use crate::{ BeaconChain, BeaconChainTypes, BlockError, ChainConfig, ServerSentEventHandler, StateSkipConfig, }; -use attestation::SingleAttestation; use bls::get_withdrawal_credentials; use eth2::types::SignedBlockContentsTuple; use execution_layer::test_utils::generate_genesis_header; diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index eb27cac4da..d8da4443fd 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -81,7 +81,7 @@ use tokio_stream::{ StreamExt, }; use types::{ - attestation::SingleAttestation, fork_versioned_response::EmptyMetadata, Attestation, + SingleAttestation, fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, diff --git a/beacon_node/http_api/src/publish_attestations.rs b/beacon_node/http_api/src/publish_attestations.rs index 2067ee66e5..5fc261438d 100644 --- a/beacon_node/http_api/src/publish_attestations.rs +++ b/beacon_node/http_api/src/publish_attestations.rs @@ -50,7 +50,7 @@ use tokio::sync::{ mpsc::{Sender, UnboundedSender}, oneshot, }; -use types::{attestation::SingleAttestation, Attestation, EthSpec}; +use types::{SingleAttestation, Attestation, EthSpec}; // Error variants are only used in `Debug` and considered `dead_code` by the compiler. #[derive(Debug)] diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index df8b346460..93607989cd 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -39,7 +39,7 @@ use tokio::time::Duration; use tree_hash::TreeHash; use types::application_domain::ApplicationDomain; use types::{ - attestation::AttestationBase, attestation::SingleAttestation, AggregateSignature, BitList, + attestation::AttestationBase, SingleAttestation, AggregateSignature, BitList, Domain, EthSpec, ExecutionBlockHash, Hash256, Keypair, MainnetEthSpec, RelativeEpoch, SelectionProof, SignedRoot, Slot, }; diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index ad95281d95..96748ae17b 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -6,7 +6,7 @@ use snap::raw::{decompress_len, Decoder, Encoder}; use ssz::{Decode, Encode}; use std::io::{Error, ErrorKind}; use std::sync::Arc; -use types::attestation::SingleAttestation; +use types::SingleAttestation; use types::{ Attestation, AttestationBase, AttestationElectra, AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index d8c7e01e2a..8e8d23c3e6 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,7 +1,6 @@ use crate::sync::manager::BlockProcessType; use crate::sync::SamplingId; use crate::{service::NetworkMessage, sync::manager::SyncMessage}; -use attestation::SingleAttestation; use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError}; diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 7cf644d92f..2b10af6fd4 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -36,7 +36,7 @@ use std::future::Future; use std::path::PathBuf; use std::time::Duration; use store::fork_versioned_response::ExecutionOptimisticFinalizedForkVersionedResponse; -use types::attestation::SingleAttestation; +use types::SingleAttestation; pub const V1: EndpointVersion = EndpointVersion(1); pub const V2: EndpointVersion = EndpointVersion(2); diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index e96e35fc0a..803defb31a 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -5,7 +5,6 @@ use crate::{ Error as ServerError, CONSENSUS_BLOCK_VALUE_HEADER, CONSENSUS_VERSION_HEADER, EXECUTION_PAYLOAD_BLINDED_HEADER, EXECUTION_PAYLOAD_VALUE_HEADER, }; -use attestation::SingleAttestation; use lighthouse_network::{ConnectionDirection, Enr, Multiaddr, PeerConnectionStatus}; use mediatype::{names, MediaType, MediaTypeList}; use reqwest::header::HeaderMap; diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index dd304c6296..1e519034f4 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -116,7 +116,7 @@ pub use crate::aggregate_and_proof::{ }; pub use crate::attestation::{ Attestation, AttestationBase, AttestationElectra, AttestationRef, AttestationRefMut, - Error as AttestationError, + Error as AttestationError, SingleAttestation }; pub use crate::attestation_data::AttestationData; pub use crate::attestation_duty::AttestationDuty; diff --git a/consensus/types/src/subnet_id.rs b/consensus/types/src/subnet_id.rs index 54beaa3dff..c010e12ff3 100644 --- a/consensus/types/src/subnet_id.rs +++ b/consensus/types/src/subnet_id.rs @@ -1,5 +1,5 @@ //! Identifies each shard by an integer identifier. -use crate::attestation::SingleAttestation; +use crate::SingleAttestation; use crate::{AttestationRef, ChainSpec, CommitteeIndex, EthSpec, Slot}; use alloy_primitives::{bytes::Buf, U256}; use safe_arith::{ArithError, SafeArith}; From 823ddf5e75b63c77109922f0bc44556a605aaaa7 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 7 Jan 2025 19:53:00 +0700 Subject: [PATCH 2/2] Fix some single attestation network plumbing --- .../src/attestation_verification.rs | 6 +- beacon_node/http_api/src/lib.rs | 13 ++-- .../http_api/src/publish_attestations.rs | 2 +- beacon_node/http_api/tests/tests.rs | 6 +- .../lighthouse_network/src/types/pubsub.rs | 67 +++++++++---------- beacon_node/network/src/service.rs | 18 ++++- beacon_node/network/src/subnet_service/mod.rs | 6 +- common/eth2/src/lib.rs | 1 - common/eth2/src/types.rs | 5 ++ consensus/types/src/lib.rs | 2 +- 10 files changed, 72 insertions(+), 54 deletions(-) diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 1a14cd2a69..e565f85ad9 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -60,9 +60,9 @@ use std::borrow::Cow; use strum::AsRefStr; use tree_hash::TreeHash; use types::{ - SingleAttestation, Attestation, AttestationRef, BeaconCommittee, - BeaconStateError::NoCommitteeFound, ChainSpec, CommitteeIndex, Epoch, EthSpec, Hash256, - IndexedAttestation, SelectionProof, SignedAggregateAndProof, Slot, SubnetId, + Attestation, AttestationRef, BeaconCommittee, BeaconStateError::NoCommitteeFound, ChainSpec, + CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation, SelectionProof, + SignedAggregateAndProof, SingleAttestation, Slot, SubnetId, }; pub use batch::{batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations}; diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index d8da4443fd..9d1797eacf 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -81,13 +81,12 @@ use tokio_stream::{ StreamExt, }; use types::{ - SingleAttestation, fork_versioned_response::EmptyMetadata, Attestation, - AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, CommitteeCache, - ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, - ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, - SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, - SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, - SyncContributionData, + fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId, + AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, + ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, + SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, + SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, + SingleAttestation, Slot, SyncCommitteeMessage, SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ diff --git a/beacon_node/http_api/src/publish_attestations.rs b/beacon_node/http_api/src/publish_attestations.rs index 5fc261438d..8e8d1a3f2f 100644 --- a/beacon_node/http_api/src/publish_attestations.rs +++ b/beacon_node/http_api/src/publish_attestations.rs @@ -50,7 +50,7 @@ use tokio::sync::{ mpsc::{Sender, UnboundedSender}, oneshot, }; -use types::{SingleAttestation, Attestation, EthSpec}; +use types::{Attestation, EthSpec, SingleAttestation}; // Error variants are only used in `Debug` and considered `dead_code` by the compiler. #[derive(Debug)] diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 93607989cd..7ebb6566f4 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -39,9 +39,9 @@ use tokio::time::Duration; use tree_hash::TreeHash; use types::application_domain::ApplicationDomain; use types::{ - attestation::AttestationBase, SingleAttestation, AggregateSignature, BitList, - Domain, EthSpec, ExecutionBlockHash, Hash256, Keypair, MainnetEthSpec, RelativeEpoch, - SelectionProof, SignedRoot, Slot, + attestation::AttestationBase, AggregateSignature, BitList, Domain, EthSpec, ExecutionBlockHash, + Hash256, Keypair, MainnetEthSpec, RelativeEpoch, SelectionProof, SignedRoot, SingleAttestation, + Slot, }; type E = MainnetEthSpec; diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 96748ae17b..0c63b5d378 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -6,16 +6,15 @@ use snap::raw::{decompress_len, Decoder, Encoder}; use ssz::{Decode, Encode}; use std::io::{Error, ErrorKind}; use std::sync::Arc; -use types::SingleAttestation; use types::{ - Attestation, AttestationBase, AttestationElectra, AttesterSlashing, AttesterSlashingBase, - AttesterSlashingElectra, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, - ForkContext, ForkName, LightClientFinalityUpdate, LightClientOptimisticUpdate, - ProposerSlashing, SignedAggregateAndProof, SignedAggregateAndProofBase, - SignedAggregateAndProofElectra, SignedBeaconBlock, SignedBeaconBlockAltair, - SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, - SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBlsToExecutionChange, - SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, + Attestation, AttestationBase, AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, + BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkContext, ForkName, + LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, + SignedAggregateAndProof, SignedAggregateAndProofBase, SignedAggregateAndProofElectra, + SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, + SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra, + SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SingleAttestation, + SubnetId, SyncCommitteeMessage, SyncSubnetId, }; #[derive(Debug, Clone, PartialEq)] @@ -194,32 +193,32 @@ impl PubsubMessage { ))) } GossipKind::Attestation(subnet_id) => { - let attestation = - match fork_context.from_context_bytes(gossip_topic.fork_digest) { - Some(&fork_name) => { - if fork_name.electra_enabled() { - Attestation::Electra( - AttestationElectra::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ) - } else { - Attestation::Base( - AttestationBase::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ) - } + match fork_context.from_context_bytes(gossip_topic.fork_digest) { + Some(&fork_name) => { + if fork_name.electra_enabled() { + let single_attestation = + SingleAttestation::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::SingleAttestation(Box::new(( + *subnet_id, + single_attestation, + )))) + } else { + let attestation = Attestation::Base( + AttestationBase::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, + ); + Ok(PubsubMessage::Attestation(Box::new(( + *subnet_id, + attestation, + )))) } - None => { - return Err(format!( - "Unknown gossipsub fork digest: {:?}", - gossip_topic.fork_digest - )) - } - }; - Ok(PubsubMessage::Attestation(Box::new(( - *subnet_id, - attestation, - )))) + } + None => Err(format!( + "Unknown gossipsub fork digest: {:?}", + gossip_topic.fork_digest + )), + } } GossipKind::BeaconBlock => { let beacon_block = diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 7826807e03..f89241b4ae 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -549,7 +549,23 @@ impl NetworkService { // the attestation, else we just just propagate the Attestation. let should_process = self.subnet_service.should_process_attestation( Subnet::Attestation(subnet_id), - attestation, + attestation.data(), + ); + self.send_to_router(RouterMessage::PubsubMessage( + id, + source, + message, + should_process, + )); + } + PubsubMessage::SingleAttestation(ref subnet_and_attestation) => { + let subnet_id = subnet_and_attestation.0; + let single_attestation = &subnet_and_attestation.1; + // checks if we have an aggregator for the slot. If so, we should process + // the attestation, else we just just propagate the Attestation. + let should_process = self.subnet_service.should_process_attestation( + Subnet::Attestation(subnet_id), + &single_attestation.data, ); self.send_to_router(RouterMessage::PubsubMessage( id, diff --git a/beacon_node/network/src/subnet_service/mod.rs b/beacon_node/network/src/subnet_service/mod.rs index da1f220f04..33ae567eb3 100644 --- a/beacon_node/network/src/subnet_service/mod.rs +++ b/beacon_node/network/src/subnet_service/mod.rs @@ -17,7 +17,7 @@ use lighthouse_network::{discv5::enr::NodeId, NetworkConfig, Subnet, SubnetDisco use slog::{debug, error, o, warn}; use slot_clock::SlotClock; use types::{ - Attestation, EthSpec, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, + AttestationData, EthSpec, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, ValidatorSubscription, }; @@ -363,7 +363,7 @@ impl SubnetService { pub fn should_process_attestation( &self, subnet: Subnet, - attestation: &Attestation, + attestation_data: &AttestationData, ) -> bool { // Proposer-only mode does not need to process attestations if self.proposer_only { @@ -374,7 +374,7 @@ impl SubnetService { .map(|tracked_vals| { tracked_vals.contains_key(&ExactSubnet { subnet, - slot: attestation.data().slot, + slot: attestation_data.slot, }) }) .unwrap_or(true) diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 2b10af6fd4..af8573a578 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -36,7 +36,6 @@ use std::future::Future; use std::path::PathBuf; use std::time::Duration; use store::fork_versioned_response::ExecutionOptimisticFinalizedForkVersionedResponse; -use types::SingleAttestation; pub const V1: EndpointVersion = EndpointVersion(1); pub const V2: EndpointVersion = EndpointVersion(2); diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 803defb31a..ff2ba1af7f 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1160,6 +1160,11 @@ impl EventKind { "attestation" => Ok(EventKind::Attestation(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Attestation: {:?}", e)), )?)), + "single_attestation" => Ok(EventKind::SingleAttestation( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("SingleAttestation: {:?}", e)) + })?, + )), "block" => Ok(EventKind::Block(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block: {:?}", e)), )?)), diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 1e519034f4..161621174a 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -116,7 +116,7 @@ pub use crate::aggregate_and_proof::{ }; pub use crate::attestation::{ Attestation, AttestationBase, AttestationElectra, AttestationRef, AttestationRefMut, - Error as AttestationError, SingleAttestation + Error as AttestationError, SingleAttestation, }; pub use crate::attestation_data::AttestationData; pub use crate::attestation_duty::AttestationDuty;