From 29f2ec46d3d9ccb73799b3e185433acb5dff388b Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 1 Nov 2022 19:58:21 +0530 Subject: [PATCH] Couple blocks and blobs in gossip (#3670) * Revert "Add more gossip verification conditions" This reverts commit 1430b561c37adb44d5705005de6bf633deb8c16d. * Revert "Add todos" This reverts commit 91efb9d4c780b55025c3793a67bd9dacc1b2c924. * Revert "Reprocess blob sidecar messages" This reverts commit 21bf3d37cdce46632cfa4e3f5abb194f172c6851. * Add the coupled topic * Decode SignedBeaconBlockAndBlobsSidecar correctly * Process Block and Blobs in beacon processor * Remove extra blob publishing logic from vc * Remove blob signing in vc * Ugly hack to compile --- Cargo.lock | 2 + beacon_node/beacon_chain/src/beacon_chain.rs | 4 +- .../beacon_chain/src/blob_verification.rs | 97 +---- beacon_node/http_api/src/lib.rs | 28 +- beacon_node/http_api/src/publish_blobs.rs | 124 ------ beacon_node/http_api/src/publish_blocks.rs | 27 +- beacon_node/lighthouse_network/Cargo.toml | 2 + beacon_node/lighthouse_network/src/lib.rs | 1 + .../lighthouse_network/src/rpc/protocol.rs | 13 +- .../src/service/gossip_cache.rs | 10 +- .../lighthouse_network/src/types/mod.rs | 2 +- .../lighthouse_network/src/types/pubsub.rs | 71 +++- .../lighthouse_network/src/types/topics.rs | 14 +- .../network/src/beacon_processor/mod.rs | 92 ++--- .../work_reprocessing_queue.rs | 159 +------- .../beacon_processor/worker/gossip_methods.rs | 111 +---- beacon_node/network/src/router/mod.rs | 6 +- beacon_node/network/src/router/processor.rs | 11 +- beacon_node/store/src/hot_cold_store.rs | 15 +- beacon_node/store/src/lib.rs | 3 +- common/eth2/src/lib.rs | 2 +- consensus/types/src/lib.rs | 2 - consensus/types/src/signed_blobs_sidecar.rs | 58 --- validator_client/src/block_service.rs | 386 ++++++------------ validator_client/src/validator_store.rs | 42 +- 25 files changed, 293 insertions(+), 989 deletions(-) delete mode 100644 beacon_node/http_api/src/publish_blobs.rs delete mode 100644 consensus/types/src/signed_blobs_sidecar.rs diff --git a/Cargo.lock b/Cargo.lock index d0831365d7..297fca7117 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3700,6 +3700,8 @@ dependencies = [ "tokio", "tokio-io-timeout", "tokio-util 0.6.10", + "tree_hash", + "tree_hash_derive", "types", "unsigned-varint 0.6.0", "unused_port", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a937a6ea41..f1d8e5fbe0 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1778,11 +1778,11 @@ impl BeaconChain { }) } - /// Accepts some `SignedBlobsSidecar` from the network and attempts to verify it, + /// Accepts some `BlobsSidecar` received over from the network and attempts to verify it, /// returning `Ok(_)` if it is valid to be (re)broadcast on the gossip network. pub fn verify_blobs_sidecar_for_gossip<'a>( &self, - blobs_sidecar: &'a SignedBlobsSidecar, + blobs_sidecar: &'a BlobsSidecar, ) -> Result, BlobError> { metrics::inc_counter(&metrics::BLOBS_SIDECAR_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::BLOBS_SIDECAR_GOSSIP_VERIFICATION_TIMES); diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index be9b0effc1..d3e0d2a17d 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -1,13 +1,10 @@ use derivative::Derivative; use slot_clock::SlotClock; -use crate::beacon_chain::{ - BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY, - VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, -}; +use crate::beacon_chain::{BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; use crate::BeaconChainError; use bls::PublicKey; -use types::{consts::eip4844::BLS_MODULUS, BeaconStateError, Hash256, SignedBlobsSidecar, Slot}; +use types::{consts::eip4844::BLS_MODULUS, BeaconStateError, BlobsSidecar, Hash256, Slot}; pub enum BlobError { /// The blob sidecar is from a slot that is later than the current slot (with respect to the @@ -61,21 +58,6 @@ pub enum BlobError { /// be equal to the given sidecar. RepeatSidecar { proposer: u64, slot: Slot }, - /// The `blobs_sidecar.message.beacon_block_root` block is unknown. - /// - /// ## Peer scoring - /// - /// The attestation points to a block we have not yet imported. It's unclear if the attestation - /// is valid or not. - UnknownHeadBlock { beacon_block_root: Hash256 }, - - /// The proposal_index corresponding to blob.beacon_block_root is not known. - /// - /// ## Peer scoring - /// - /// The block is invalid and the peer is faulty. - UnknownValidator(u64), - /// There was an error whilst processing the sync contribution. It is not known if it is valid or invalid. /// /// ## Peer scoring @@ -97,109 +79,58 @@ impl From for BlobError { } } -/// A wrapper around a `SignedBlobsSidecar` that indicates it has been approved for re-gossiping on -/// the p2p network. +/// A wrapper around a `BlobsSidecar` that indicates it has been verified w.r.t the corresponding +/// `SignedBeaconBlock`. #[derive(Derivative)] #[derivative(Debug(bound = "T: BeaconChainTypes"))] pub struct VerifiedBlobsSidecar<'a, T: BeaconChainTypes> { - pub blob_sidecar: &'a SignedBlobsSidecar, + pub blob_sidecar: &'a BlobsSidecar, } impl<'a, T: BeaconChainTypes> VerifiedBlobsSidecar<'a, T> { pub fn verify( - blob_sidecar: &'a SignedBlobsSidecar, + blob_sidecar: &'a BlobsSidecar, chain: &BeaconChain, ) -> Result { - let block_slot = blob_sidecar.message.beacon_block_slot; - let block_root = blob_sidecar.message.beacon_block_root; + let blob_slot = blob_sidecar.beacon_block_slot; // Do not gossip or process blobs from future or past slots. let latest_permissible_slot = chain .slot_clock .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) .ok_or(BeaconChainError::UnableToReadSlot)?; - if block_slot > latest_permissible_slot { + if blob_slot > latest_permissible_slot { return Err(BlobError::FutureSlot { message_slot: latest_permissible_slot, - latest_permissible_slot: block_slot, + latest_permissible_slot: blob_slot, }); } - // TODO: return `UnknownHeadBlock` if blob_root doesn't exist in fork choice - // and wherever it could be found. - let earliest_permissible_slot = chain .slot_clock .now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) .ok_or(BeaconChainError::UnableToReadSlot)?; - if block_slot > earliest_permissible_slot { + if blob_slot > earliest_permissible_slot { return Err(BlobError::PastSlot { message_slot: earliest_permissible_slot, - earliest_permissible_slot: block_slot, + earliest_permissible_slot: blob_slot, }); } // Verify that blobs are properly formatted //TODO: add the check while constructing a Blob type from bytes instead of after - for (i, blob) in blob_sidecar.message.blobs.iter().enumerate() { + for (i, blob) in blob_sidecar.blobs.iter().enumerate() { if blob.iter().any(|b| *b >= *BLS_MODULUS) { return Err(BlobError::BlobOutOfRange { blob_index: i }); } } // Verify that the KZG proof is a valid G1 point - // TODO(pawan): KZG commitment can also be point at infinity, use a different check - // (bls.KeyValidate) - if PublicKey::deserialize(&blob_sidecar.message.kzg_aggregate_proof.0).is_err() { + if PublicKey::deserialize(&blob_sidecar.kzg_aggregate_proof.0).is_err() { return Err(BlobError::InvalidKZGCommitment); } - let proposer_shuffling_root = chain - .canonical_head - .cached_head() - .snapshot - .beacon_state - .proposer_shuffling_decision_root(block_root)?; + // TODO: Check that we have not already received a sidecar with a valid signature for this slot. - let (proposer_index, fork) = match chain - .beacon_proposer_cache - .lock() - .get_slot::(proposer_shuffling_root, block_slot) - { - Some(proposer) => (proposer.index, proposer.fork), - None => { - let state = &chain.canonical_head.cached_head().snapshot.beacon_state; - ( - state.get_beacon_proposer_index(block_slot, &chain.spec)?, - state.fork(), - ) - } - }; - let signature_is_valid = { - let pubkey_cache = chain - .validator_pubkey_cache - .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) - .ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout) - .map_err(BlobError::BeaconChainError)?; - - let pubkey = pubkey_cache - .get(proposer_index as usize) - .ok_or_else(|| BlobError::UnknownValidator(proposer_index as u64))?; - - blob_sidecar.verify_signature( - None, - pubkey, - &fork, - chain.genesis_validators_root, - &chain.spec, - ) - }; - - if !signature_is_valid { - return Err(BlobError::ProposalSignatureInvalid); - } - - // TODO(pawan): Check that we have not already received a sidecar with a valid signature for this slot. - // TODO(pawan): check if block hash is already known Ok(Self { blob_sidecar }) } } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 203b462b16..edb1ad091f 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -13,7 +13,6 @@ mod block_rewards; mod database; mod metrics; mod proposer_duties; -mod publish_blobs; mod publish_blocks; mod state_id; mod sync_committees; @@ -49,7 +48,7 @@ use types::{ Attestation, AttestationData, AttesterSlashing, BeaconStateError, BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, - SignedBeaconBlock, SignedBlindedBeaconBlock, SignedBlobsSidecar, SignedContributionAndProof, + SignedBeaconBlock, SignedBlindedBeaconBlock, SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, }; @@ -1047,27 +1046,9 @@ pub fn serve( chain: Arc>, network_tx: UnboundedSender>, log: Logger| async move { - publish_blocks::publish_block(None, block, chain, &network_tx, log) - .await - .map(|()| warp::reply()) - }, - ); - - // POST beacon/blobs - let post_beacon_blobs = eth_v1 - .and(warp::path("beacon")) - .and(warp::path("blobs")) - .and(warp::path::end()) - .and(warp::body::json()) - .and(chain_filter.clone()) - .and(network_tx_filter.clone()) - .and(log_filter.clone()) - .and_then( - |blobs: Arc>, - chain: Arc>, - network_tx: UnboundedSender>, - log: Logger| async move { - publish_blobs::publish_blobs(blobs, chain, &network_tx, log) + // need to have cached the blob sidecar somewhere in the beacon chain + // to publish + publish_blocks::publish_block(None, block, None, chain, &network_tx, log) .await .map(|()| warp::reply()) }, @@ -3183,7 +3164,6 @@ pub fn serve( post_beacon_blocks .boxed() .or(post_beacon_blinded_blocks.boxed()) - .or(post_beacon_blobs.boxed()) .or(post_beacon_pool_attestations.boxed()) .or(post_beacon_pool_attester_slashings.boxed()) .or(post_beacon_pool_proposer_slashings.boxed()) diff --git a/beacon_node/http_api/src/publish_blobs.rs b/beacon_node/http_api/src/publish_blobs.rs deleted file mode 100644 index cf08ac2d2b..0000000000 --- a/beacon_node/http_api/src/publish_blobs.rs +++ /dev/null @@ -1,124 +0,0 @@ -use crate::metrics; -use beacon_chain::validator_monitor::{get_slot_delay_ms, timestamp_now}; -use beacon_chain::{BeaconChain, BeaconChainTypes}; -use lighthouse_network::PubsubMessage; -use network::NetworkMessage; -use slog::Logger; -use std::sync::Arc; -use tokio::sync::mpsc::UnboundedSender; -use types::SignedBlobsSidecar; -use warp::Rejection; - -/// Handles a request from the HTTP API for full blocks. -pub async fn publish_blobs( - blobs_sidecar: Arc>, - chain: Arc>, - network_tx: &UnboundedSender>, - log: Logger, -) -> Result<(), Rejection> { - let seen_timestamp = timestamp_now(); - - // Send the blob, regardless of whether or not it is valid. The API - // specification is very clear that this is the desired behaviour. - crate::publish_pubsub_message( - network_tx, - PubsubMessage::BlobsSidecars(blobs_sidecar.clone()), - )?; - - // Determine the delay after the start of the slot, register it with metrics. - let delay = get_slot_delay_ms( - seen_timestamp, - blobs_sidecar.message.beacon_block_slot, - &chain.slot_clock, - ); - metrics::observe_duration(&metrics::HTTP_API_BLOB_BROADCAST_DELAY_TIMES, delay); - - //FIXME(sean) process blobs - // match chain - // .process_block(blobs_sidecar.clone(), CountUnrealized::True) - // .await - // { - // Ok(root) => { - // info!( - // log, - // "Valid block from HTTP API"; - // "block_delay" => ?delay, - // "root" => format!("{}", root), - // "proposer_index" => block.message().proposer_index(), - // "slot" => block.slot(), - // ); - // - // // Notify the validator monitor. - // chain.validator_monitor.read().register_api_block( - // seen_timestamp, - // blobs_sidecar.message(), - // root, - // &chain.slot_clock, - // ); - // - // // Update the head since it's likely this block will become the new - // // head. - // chain.recompute_head_at_current_slot().await; - // - // // Perform some logging to inform users if their blocks are being produced - // // late. - // // - // // Check to see the thresholds are non-zero to avoid logging errors with small - // // slot times (e.g., during testing) - // let crit_threshold = chain.slot_clock.unagg_attestation_production_delay(); - // let error_threshold = crit_threshold / 2; - // if delay >= crit_threshold { - // crit!( - // log, - // "Block was broadcast too late"; - // "msg" => "system may be overloaded, block likely to be orphaned", - // "delay_ms" => delay.as_millis(), - // "slot" => block.slot(), - // "root" => ?root, - // ) - // } else if delay >= error_threshold { - // error!( - // log, - // "Block broadcast was delayed"; - // "msg" => "system may be overloaded, block may be orphaned", - // "delay_ms" => delay.as_millis(), - // "slot" => block.slot(), - // "root" => ?root, - // ) - // } - // - // Ok(()) - // } - // Err(BlockError::BlockIsAlreadyKnown) => { - // info!( - // log, - // "Block from HTTP API already known"; - // "block" => ?block.canonical_root(), - // "slot" => block.slot(), - // ); - // Ok(()) - // } - // Err(BlockError::RepeatProposal { proposer, slot }) => { - // warn!( - // log, - // "Block ignored due to repeat proposal"; - // "msg" => "this can happen when a VC uses fallback BNs. \ - // whilst this is not necessarily an error, it can indicate issues with a BN \ - // or between the VC and BN.", - // "slot" => slot, - // "proposer" => proposer, - // ); - // Ok(()) - // } - // Err(e) => { - // let msg = format!("{:?}", e); - // error!( - // log, - // "Invalid block provided to HTTP API"; - // "reason" => &msg - // ); - // Err(warp_utils::reject::broadcast_without_import(msg)) - // } - // } - Ok(()) -} diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 09159d3c0c..0167da8d47 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -1,7 +1,7 @@ use crate::metrics; use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized}; -use lighthouse_network::PubsubMessage; +use lighthouse_network::{PubsubMessage, SignedBeaconBlockAndBlobsSidecar}; use network::NetworkMessage; use slog::{crit, error, info, warn, Logger}; use slot_clock::SlotClock; @@ -9,8 +9,8 @@ use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tree_hash::TreeHash; use types::{ - AbstractExecPayload, BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash, FullPayload, - Hash256, SignedBeaconBlock, + AbstractExecPayload, BlindedPayload, BlobsSidecar, EthSpec, ExecPayload, ExecutionBlockHash, + FullPayload, Hash256, SignedBeaconBlock, SignedBeaconBlockEip4844, }; use warp::Rejection; @@ -18,6 +18,7 @@ use warp::Rejection; pub async fn publish_block( block_root: Option, block: Arc>, + blobs_sidecar: Option>>, chain: Arc>, network_tx: &UnboundedSender>, log: Logger, @@ -26,7 +27,24 @@ pub async fn publish_block( // Send the block, regardless of whether or not it is valid. The API // specification is very clear that this is the desired behaviour. - crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?; + + let message = match &*block { + SignedBeaconBlock::Eip4844(block) => { + if let Some(sidecar) = blobs_sidecar { + PubsubMessage::BeaconBlockAndBlobsSidecars(Arc::new( + SignedBeaconBlockAndBlobsSidecar { + beacon_block: block.clone(), + blobs_sidecar: (*sidecar).clone(), + }, + )) + } else { + //TODO(pawan): return an empty sidecar instead + return Err(warp_utils::reject::broadcast_without_import(format!(""))); + } + } + _ => PubsubMessage::BeaconBlock(block.clone()), + }; + crate::publish_pubsub_message(network_tx, message)?; // Determine the delay after the start of the slot, register it with metrics. let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); @@ -135,6 +153,7 @@ pub async fn publish_blinded_block( publish_block::( Some(block_root), Arc::new(full_block), + None, chain, network_tx, log, diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index e5af0a7499..a8a4af1c8b 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -13,6 +13,8 @@ serde = { version = "1.0.116", features = ["derive"] } serde_derive = "1.0.116" eth2_ssz = "0.4.1" eth2_ssz_derive = "0.3.0" +tree_hash = "0.4.1" +tree_hash_derive = "0.4.0" slog = { version = "2.5.2", features = ["max_level_trace"] } lighthouse_version = { path = "../../common/lighthouse_version" } tokio = { version = "1.14.0", features = ["time", "macros"] } diff --git a/beacon_node/lighthouse_network/src/lib.rs b/beacon_node/lighthouse_network/src/lib.rs index be4da809cb..d7733f7cd3 100644 --- a/beacon_node/lighthouse_network/src/lib.rs +++ b/beacon_node/lighthouse_network/src/lib.rs @@ -15,6 +15,7 @@ pub mod peer_manager; pub mod rpc; pub mod types; +pub use crate::types::SignedBeaconBlockAndBlobsSidecar; pub use config::gossip_max_size; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index dc1110954a..8511d26208 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -21,7 +21,6 @@ use tokio_util::{ compat::{Compat, FuturesAsyncReadCompatExt}, }; use types::BlobsSidecar; -use types::SignedBlobsSidecar; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Blob, EthSpec, ForkContext, ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock, @@ -108,13 +107,11 @@ lazy_static! { .as_ssz_bytes() .len(); - pub static ref SIGNED_BLOBS_SIDECAR_MIN: usize = SignedBlobsSidecar { - message: BlobsSidecar::::empty(), - signature: Signature::empty(), - }.as_ssz_bytes() + pub static ref BLOBS_SIDECAR_MIN: usize = BlobsSidecar::::empty() + .as_ssz_bytes() .len(); - pub static ref SIGNED_BLOBS_SIDECAR_MAX: usize = *SIGNED_BLOBS_SIDECAR_MIN // Max size of variable length `blobs` field + pub static ref BLOBS_SIDECAR_MAX: usize = *BLOBS_SIDECAR_MIN // Max size of variable length `blobs` field + (MainnetEthSpec::max_blobs_per_block() * as Encode>::ssz_fixed_len()); } @@ -343,9 +340,7 @@ impl ProtocolId { Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), - Protocol::BlobsByRange => { - RpcLimits::new(*SIGNED_BLOBS_SIDECAR_MIN, *SIGNED_BLOBS_SIDECAR_MAX) - } + Protocol::BlobsByRange => RpcLimits::new(*BLOBS_SIDECAR_MIN, *BLOBS_SIDECAR_MAX), Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), diff --git a/beacon_node/lighthouse_network/src/service/gossip_cache.rs b/beacon_node/lighthouse_network/src/service/gossip_cache.rs index 1c6ffd022d..665e383f20 100644 --- a/beacon_node/lighthouse_network/src/service/gossip_cache.rs +++ b/beacon_node/lighthouse_network/src/service/gossip_cache.rs @@ -21,7 +21,7 @@ pub struct GossipCache { /// Timeout for blocks. beacon_block: Option, /// Timeout for blobs. - blobs_sidecar: Option, + beacon_block_and_blobs_sidecar: Option, /// Timeout for aggregate attestations. aggregates: Option, /// Timeout for attestations. @@ -44,7 +44,7 @@ pub struct GossipCacheBuilder { /// Timeout for blocks. beacon_block: Option, /// Timeout for blob sidecars. - blobs_sidecar: Option, + beacon_block_and_blobs_sidecar: Option, /// Timeout for aggregate attestations. aggregates: Option, /// Timeout for attestations. @@ -121,7 +121,7 @@ impl GossipCacheBuilder { let GossipCacheBuilder { default_timeout, beacon_block, - blobs_sidecar, + beacon_block_and_blobs_sidecar, aggregates, attestation, voluntary_exit, @@ -134,7 +134,7 @@ impl GossipCacheBuilder { expirations: DelayQueue::default(), topic_msgs: HashMap::default(), beacon_block: beacon_block.or(default_timeout), - blobs_sidecar: blobs_sidecar.or(default_timeout), + beacon_block_and_blobs_sidecar: beacon_block_and_blobs_sidecar.or(default_timeout), aggregates: aggregates.or(default_timeout), attestation: attestation.or(default_timeout), voluntary_exit: voluntary_exit.or(default_timeout), @@ -157,7 +157,7 @@ impl GossipCache { pub fn insert(&mut self, topic: GossipTopic, data: Vec) { let expire_timeout = match topic.kind() { GossipKind::BeaconBlock => self.beacon_block, - GossipKind::BlobsSidecar => self.blobs_sidecar, + GossipKind::BeaconBlocksAndBlobsSidecar => self.beacon_block_and_blobs_sidecar, GossipKind::BeaconAggregateAndProof => self.aggregates, GossipKind::Attestation(_) => self.attestation, GossipKind::VoluntaryExit => self.voluntary_exit, diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index ad02e07fb7..404311ac16 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -13,7 +13,7 @@ pub type EnrSyncCommitteeBitfield = BitVector<::SyncCommitteeSu pub type Enr = discv5::enr::Enr; pub use globals::NetworkGlobals; -pub use pubsub::{PubsubMessage, SnappyTransform}; +pub use pubsub::{PubsubMessage, SignedBeaconBlockAndBlobsSidecar, SnappyTransform}; pub use subnet::{Subnet, SubnetDiscovery}; pub use sync_state::{BackFillState, SyncState}; pub use topics::{subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS}; diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 6f6de68ecb..1b14c93c09 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -3,25 +3,37 @@ use crate::types::{GossipEncoding, GossipKind, GossipTopic}; use crate::TopicHash; use libp2p::gossipsub::{DataTransform, GossipsubMessage, RawGossipsubMessage}; +use serde_derive::{Deserialize, Serialize}; use snap::raw::{decompress_len, Decoder, Encoder}; use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; use std::boxed::Box; use std::io::{Error, ErrorKind}; use std::sync::Arc; -use types::signed_blobs_sidecar::SignedBlobsSidecar; +use tree_hash_derive::TreeHash; use types::{ - Attestation, AttesterSlashing, EthSpec, ForkContext, ForkName, ProposerSlashing, + Attestation, AttesterSlashing, BlobsSidecar, EthSpec, ForkContext, ForkName, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockEip4844, SignedBeaconBlockMerge, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; +/// TODO(pawan): move this to consensus/types? strictly not a consensus type +#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq)] +#[serde(bound = "T: EthSpec")] +pub struct SignedBeaconBlockAndBlobsSidecar { + // TODO(pawan): switch to a SignedBeaconBlock and use ssz offsets for decoding to make this + // future proof? + pub beacon_block: SignedBeaconBlockEip4844, + pub blobs_sidecar: BlobsSidecar, +} + #[derive(Debug, Clone, PartialEq)] pub enum PubsubMessage { /// Gossipsub message providing notification of a new block. BeaconBlock(Arc>), - /// Gossipsub message providing notification of a new blobs sidecar. - BlobsSidecars(Arc>), + /// Gossipsub message providing notification of a new SignedBeaconBlock coupled with a blobs sidecar. + BeaconBlockAndBlobsSidecars(Arc>), /// Gossipsub message providing notification of a Aggregate attestation and associated proof. AggregateAndProofAttestation(Box>), /// Gossipsub message providing notification of a raw un-aggregated attestation with its shard id. @@ -109,7 +121,9 @@ impl PubsubMessage { pub fn kind(&self) -> GossipKind { match self { PubsubMessage::BeaconBlock(_) => GossipKind::BeaconBlock, - PubsubMessage::BlobsSidecars(_) => GossipKind::BlobsSidecar, + PubsubMessage::BeaconBlockAndBlobsSidecars(_) => { + GossipKind::BeaconBlocksAndBlobsSidecar + } PubsubMessage::AggregateAndProofAttestation(_) => GossipKind::BeaconAggregateAndProof, PubsubMessage::Attestation(attestation_data) => { GossipKind::Attestation(attestation_data.0) @@ -171,10 +185,12 @@ impl PubsubMessage { SignedBeaconBlockMerge::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?, ), - Some(ForkName::Eip4844) => SignedBeaconBlock::::Eip4844( - SignedBeaconBlockEip4844::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ), + Some(ForkName::Eip4844) => { + return Err( + "beacon_block topic is not used from eip4844 fork onwards" + .to_string(), + ) + } Some(ForkName::Capella) => SignedBeaconBlock::::Capella( SignedBeaconBlockCapella::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?, @@ -188,10 +204,29 @@ impl PubsubMessage { }; Ok(PubsubMessage::BeaconBlock(Arc::new(beacon_block))) } - GossipKind::BlobsSidecar => { - let blobs_sidecar = SignedBlobsSidecar::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; - Ok(PubsubMessage::BlobsSidecars(Arc::new(blobs_sidecar))) + GossipKind::BeaconBlocksAndBlobsSidecar => { + match fork_context.from_context_bytes(gossip_topic.fork_digest) { + Some(ForkName::Eip4844) => { + let block_and_blobs_sidecar = + SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::BeaconBlockAndBlobsSidecars(Arc::new( + block_and_blobs_sidecar, + ))) + } + Some( + ForkName::Base + | ForkName::Altair + | ForkName::Merge + | ForkName::Capella, + ) + | None => { + return Err(format!( + "beacon_blobs_and_sidecar topic invalid for given fork digest {:?}", + gossip_topic.fork_digest + )) + } + } } GossipKind::VoluntaryExit => { let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(data) @@ -237,7 +272,7 @@ impl PubsubMessage { // messages for us. match &self { PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(), - PubsubMessage::BlobsSidecars(data) => data.as_ssz_bytes(), + PubsubMessage::BeaconBlockAndBlobsSidecars(data) => data.as_ssz_bytes(), PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(), PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), @@ -258,11 +293,11 @@ impl std::fmt::Display for PubsubMessage { block.slot(), block.message().proposer_index() ), - PubsubMessage::BlobsSidecars(blobs) => write!( + PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blob) => write!( f, - "Blobs Sidecar: slot: {}, blobs: {}", - blobs.message.beacon_block_slot, - blobs.message.blobs.len(), + "Beacon block and Blobs Sidecar: slot: {}, blobs: {}", + block_and_blob.beacon_block.message.slot, + block_and_blob.blobs_sidecar.blobs.len(), ), PubsubMessage::AggregateAndProofAttestation(att) => write!( f, diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 3e77264809..1be94a93f1 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -11,7 +11,7 @@ use crate::Subnet; pub const TOPIC_PREFIX: &str = "eth2"; pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy"; pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; -pub const BLOBS_SIDECAR_TOPIC: &str = "blobs_sidecar"; +pub const BEACON_BLOCK_AND_BLOBS_SIDECAR_TOPIC: &str = "beacon_blocks_and_blobs_sidecar"; pub const BEACON_AGGREGATE_AND_PROOF_TOPIC: &str = "beacon_aggregate_and_proof"; pub const BEACON_ATTESTATION_PREFIX: &str = "beacon_attestation_"; pub const VOLUNTARY_EXIT_TOPIC: &str = "voluntary_exit"; @@ -22,7 +22,7 @@ pub const SYNC_COMMITTEE_PREFIX_TOPIC: &str = "sync_committee_"; pub const CORE_TOPICS: [GossipKind; 7] = [ GossipKind::BeaconBlock, - GossipKind::BlobsSidecar, + GossipKind::BeaconBlocksAndBlobsSidecar, GossipKind::BeaconAggregateAndProof, GossipKind::VoluntaryExit, GossipKind::ProposerSlashing, @@ -49,8 +49,8 @@ pub struct GossipTopic { pub enum GossipKind { /// Topic for publishing beacon blocks. BeaconBlock, - /// Topic for publishing blob sidecars. - BlobsSidecar, + /// Topic for publishing beacon block coupled with blob sidecars. + BeaconBlocksAndBlobsSidecar, /// Topic for publishing aggregate attestations and proofs. BeaconAggregateAndProof, /// Topic for publishing raw attestations on a particular subnet. @@ -136,6 +136,7 @@ impl GossipTopic { let kind = match topic_parts[3] { BEACON_BLOCK_TOPIC => GossipKind::BeaconBlock, BEACON_AGGREGATE_AND_PROOF_TOPIC => GossipKind::BeaconAggregateAndProof, + BEACON_BLOCK_AND_BLOBS_SIDECAR_TOPIC => GossipKind::BeaconBlocksAndBlobsSidecar, SIGNED_CONTRIBUTION_AND_PROOF_TOPIC => GossipKind::SignedContributionAndProof, VOLUNTARY_EXIT_TOPIC => GossipKind::VoluntaryExit, PROPOSER_SLASHING_TOPIC => GossipKind::ProposerSlashing, @@ -182,7 +183,7 @@ impl From for String { let kind = match topic.kind { GossipKind::BeaconBlock => BEACON_BLOCK_TOPIC.into(), - GossipKind::BlobsSidecar => BLOBS_SIDECAR_TOPIC.into(), + GossipKind::BeaconBlocksAndBlobsSidecar => BEACON_BLOCK_AND_BLOBS_SIDECAR_TOPIC.into(), GossipKind::BeaconAggregateAndProof => BEACON_AGGREGATE_AND_PROOF_TOPIC.into(), GossipKind::VoluntaryExit => VOLUNTARY_EXIT_TOPIC.into(), GossipKind::ProposerSlashing => PROPOSER_SLASHING_TOPIC.into(), @@ -211,7 +212,7 @@ impl std::fmt::Display for GossipTopic { let kind = match self.kind { GossipKind::BeaconBlock => BEACON_BLOCK_TOPIC.into(), - GossipKind::BlobsSidecar => BLOBS_SIDECAR_TOPIC.into(), + GossipKind::BeaconBlocksAndBlobsSidecar => BEACON_BLOCK_AND_BLOBS_SIDECAR_TOPIC.into(), GossipKind::BeaconAggregateAndProof => BEACON_AGGREGATE_AND_PROOF_TOPIC.into(), GossipKind::VoluntaryExit => VOLUNTARY_EXIT_TOPIC.into(), GossipKind::ProposerSlashing => PROPOSER_SLASHING_TOPIC.into(), @@ -293,6 +294,7 @@ mod tests { VoluntaryExit, ProposerSlashing, AttesterSlashing, + BeaconBlocksAndBlobsSidecar, ] .iter() { diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 87e092332f..dd28b15c0c 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -46,6 +46,7 @@ use derivative::Derivative; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; use lighthouse_network::rpc::methods::BlobsByRangeRequest; +use lighthouse_network::SignedBeaconBlockAndBlobsSidecar; use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, @@ -61,7 +62,6 @@ use std::time::Duration; use std::{cmp, collections::HashSet}; use task_executor::TaskExecutor; use tokio::sync::mpsc; -use types::signed_blobs_sidecar::SignedBlobsSidecar; use types::{ Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, @@ -80,8 +80,6 @@ mod worker; use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock; pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage}; -use self::work_reprocessing_queue::QueuedBlobsSidecar; - /// The maximum size of the channel for work events to the `BeaconProcessor`. /// /// Setting this too low will cause consensus messages to be dropped. @@ -117,9 +115,7 @@ const MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 1_024; const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024; //FIXME(sean) verify -const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024; -//FIXME(sean) verify -const MAX_BLOBS_SIDECAR_REPROCESS_QUEUE_LEN: usize = 1_024; +const MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but /// within acceptable clock disparity) that will be queued before we start dropping them. @@ -195,7 +191,7 @@ pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch"; pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch"; pub const GOSSIP_BLOCK: &str = "gossip_block"; -pub const GOSSIP_BLOBS_SIDECAR: &str = "gossip_blobs_sidecar"; +pub const GOSSIP_BLOCK_AND_BLOBS_SIDECAR: &str = "gossip_block_and_blobs_sidecar"; pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing"; @@ -210,7 +206,6 @@ pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; -pub const UNKNOWN_BLOBS_SIDECAR: &str = "unknown_blobs_sidecar"; /// A simple first-in-first-out queue with a maximum length. struct FifoQueue { @@ -415,19 +410,20 @@ impl WorkEvent { } /// Create a new `Work` event for some blobs sidecar. - pub fn gossip_blobs_sidecar( + pub fn gossip_block_and_blobs_sidecar( message_id: MessageId, peer_id: PeerId, - _peer_client: Client, - blobs: Arc>, + peer_client: Client, + block_and_blobs: Arc>, seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: false, - work: Work::GossipBlobsSidecar { + work: Work::GossipBlockAndBlobsSidecar { message_id, peer_id, - blobs, + peer_client, + block_and_blobs, seen_timestamp, }, } @@ -674,20 +670,6 @@ impl std::convert::From> for WorkEvent { seen_timestamp, }, }, - ReadyWork::BlobsSidecar(QueuedBlobsSidecar { - peer_id, - message_id, - blobs_sidecar, - seen_timestamp, - }) => Self { - drop_during_sync: true, - work: Work::UnknownBlobsSidecar { - message_id, - peer_id, - blobs: blobs_sidecar, - seen_timestamp, - }, - }, } } } @@ -737,16 +719,11 @@ pub enum Work { block: Arc>, seen_timestamp: Duration, }, - GossipBlobsSidecar { + GossipBlockAndBlobsSidecar { message_id: MessageId, peer_id: PeerId, - blobs: Arc>, - seen_timestamp: Duration, - }, - UnknownBlobsSidecar { - message_id: MessageId, - peer_id: PeerId, - blobs: Arc>, + peer_client: Client, + block_and_blobs: Arc>, seen_timestamp: Duration, }, DelayedImportBlock { @@ -823,7 +800,7 @@ impl Work { Work::GossipAggregate { .. } => GOSSIP_AGGREGATE, Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH, Work::GossipBlock { .. } => GOSSIP_BLOCK, - Work::GossipBlobsSidecar { .. } => GOSSIP_BLOBS_SIDECAR, + Work::GossipBlockAndBlobsSidecar { .. } => GOSSIP_BLOCK_AND_BLOBS_SIDECAR, Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT, Work::GossipProposerSlashing { .. } => GOSSIP_PROPOSER_SLASHING, @@ -838,7 +815,6 @@ impl Work { Work::BlobsByRangeRequest { .. } => BLOBS_BY_RANGE_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, - Work::UnknownBlobsSidecar { .. } => UNKNOWN_BLOBS_SIDECAR, } } } @@ -955,7 +931,6 @@ impl BeaconProcessor { LifoQueue::new(MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN); let mut unknown_block_attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN); - let mut unknown_blobs_sidecar_queue = LifoQueue::new(MAX_BLOBS_SIDECAR_REPROCESS_QUEUE_LEN); let mut sync_message_queue = LifoQueue::new(MAX_SYNC_MESSAGE_QUEUE_LEN); let mut sync_contribution_queue = LifoQueue::new(MAX_SYNC_CONTRIBUTION_QUEUE_LEN); @@ -976,7 +951,8 @@ impl BeaconProcessor { let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); - let mut gossip_blobs_sidecar_queue = FifoQueue::new(MAX_GOSSIP_BLOB_QUEUE_LEN); + let mut gossip_block_and_blobs_sidecar_queue = + FifoQueue::new(MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN); let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN); let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); @@ -1086,7 +1062,7 @@ impl BeaconProcessor { } else if let Some(item) = gossip_block_queue.pop() { self.spawn_worker(item, toolbox); //FIXME(sean) - } else if let Some(item) = gossip_blobs_sidecar_queue.pop() { + } else if let Some(item) = gossip_block_and_blobs_sidecar_queue.pop() { self.spawn_worker(item, toolbox); // Check the aggregates, *then* the unaggregates since we assume that // aggregates are more valuable to local validators and effectively give us @@ -1292,8 +1268,8 @@ impl BeaconProcessor { Work::GossipBlock { .. } => { gossip_block_queue.push(work, work_id, &self.log) } - Work::GossipBlobsSidecar { .. } => { - gossip_blobs_sidecar_queue.push(work, work_id, &self.log) + Work::GossipBlockAndBlobsSidecar { .. } => { + gossip_block_and_blobs_sidecar_queue.push(work, work_id, &self.log) } Work::DelayedImportBlock { .. } => { delayed_block_queue.push(work, work_id, &self.log) @@ -1337,9 +1313,6 @@ impl BeaconProcessor { Work::UnknownBlockAggregate { .. } => { unknown_block_aggregate_queue.push(work) } - Work::UnknownBlobsSidecar { .. } => { - unknown_blobs_sidecar_queue.push(work) - } } } } @@ -1556,19 +1529,22 @@ impl BeaconProcessor { /* * Verification for blobs sidecars received on gossip. */ - Work::GossipBlobsSidecar { + Work::GossipBlockAndBlobsSidecar { message_id, peer_id, - blobs, + peer_client, + block_and_blobs, seen_timestamp, } => task_spawner.spawn_async(async move { - worker.process_gossip_blob( - message_id, - peer_id, - blobs, - Some(work_reprocessing_tx), - seen_timestamp, - ) + worker + .process_gossip_block_and_blobs_sidecar( + message_id, + peer_id, + peer_client, + block_and_blobs, + seen_timestamp, + ) + .await }), /* * Import for blocks that we received earlier than their intended slot. @@ -1755,14 +1731,6 @@ impl BeaconProcessor { seen_timestamp, ) }), - Work::UnknownBlobsSidecar { - message_id, - peer_id, - blobs, - seen_timestamp, - } => task_spawner.spawn_blocking(move || { - worker.process_gossip_blob(message_id, peer_id, blobs, None, seen_timestamp) - }), }; } } diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index b08542eeb5..2aeec11c32 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -30,10 +30,7 @@ use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::time::error::Error as TimeError; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; -use types::{ - Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobsSidecar, - SubnetId, -}; +use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId}; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; const GOSSIP_BLOCKS: &str = "gossip_blocks"; @@ -47,10 +44,6 @@ const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5); /// For how long to queue aggregated and unaggregated attestations for re-processing. pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); -/// For how long to queue blob sidecars for re-processing. -/// TODO: rethink duration -pub const QUEUED_BLOBS_SIDECARS_DELAY: Duration = Duration::from_secs(6); - /// For how long to queue rpc blocks before sending them back for reprocessing. pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3); @@ -62,10 +55,6 @@ const MAXIMUM_QUEUED_BLOCKS: usize = 16; /// How many attestations we keep before new ones get dropped. const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; -/// TODO: fix number -/// How many blobs we keep before new ones get dropped. -const MAXIMUM_QUEUED_BLOB_SIDECARS: usize = 16_384; - /// Messages that the scheduler can receive. pub enum ReprocessQueueMessage { /// A block that has been received early and we should queue for later processing. @@ -80,8 +69,6 @@ pub enum ReprocessQueueMessage { UnknownBlockUnaggregate(QueuedUnaggregate), /// An aggregated attestation that references an unknown block. UnknownBlockAggregate(QueuedAggregate), - /// A blob sidecar that references an unknown block. - UnknownBlobSidecar(QueuedBlobsSidecar), } /// Events sent by the scheduler once they are ready for re-processing. @@ -90,7 +77,6 @@ pub enum ReadyWork { RpcBlock(QueuedRpcBlock), Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), - BlobsSidecar(QueuedBlobsSidecar), } /// An Attestation for which the corresponding block was not seen while processing, queued for @@ -132,15 +118,6 @@ pub struct QueuedRpcBlock { pub should_process: bool, } -/// A blob sidecar for which the corresponding block was not seen while processing, queued for -/// later. -pub struct QueuedBlobsSidecar { - pub peer_id: PeerId, - pub message_id: MessageId, - pub blobs_sidecar: Arc>, - pub seen_timestamp: Duration, -} - /// Unifies the different messages processed by the block delay queue. enum InboundEvent { /// A gossip block that was queued for later processing and is ready for import. @@ -150,8 +127,6 @@ enum InboundEvent { ReadyRpcBlock(QueuedRpcBlock), /// An aggregated or unaggregated attestation is ready for re-processing. ReadyAttestation(QueuedAttestationId), - /// A blob sidecar is ready for re-processing. - ReadyBlobsSidecar(QueuedBlobsSidecarId), /// A `DelayQueue` returned an error. DelayQueueError(TimeError, &'static str), /// A message sent to the `ReprocessQueue` @@ -172,7 +147,6 @@ struct ReprocessQueue { rpc_block_delay_queue: DelayQueue>, /// Queue to manage scheduled attestations. attestations_delay_queue: DelayQueue, - blobs_sidecar_delay_queue: DelayQueue, /* Queued items */ /// Queued blocks. @@ -181,19 +155,15 @@ struct ReprocessQueue { queued_aggregates: FnvHashMap, DelayKey)>, /// Queued attestations. queued_unaggregates: FnvHashMap, DelayKey)>, - queued_blob_sidecars: FnvHashMap, DelayKey)>, /// Attestations (aggregated and unaggregated) per root. awaiting_attestations_per_root: HashMap>, - awaiting_blobs_sidecars_per_root: HashMap>, /* Aux */ /// Next attestation id, used for both aggregated and unaggregated attestations next_attestation: usize, - next_sidecar: usize, early_block_debounce: TimeLatch, rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, - blobs_sidecar_debounce: TimeLatch, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -202,9 +172,6 @@ enum QueuedAttestationId { Unaggregate(usize), } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -struct QueuedBlobsSidecarId(usize); - impl QueuedAggregate { pub fn beacon_block_root(&self) -> &Hash256 { &self.attestation.message.aggregate.data.beacon_block_root @@ -268,21 +235,6 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } - match self.blobs_sidecar_delay_queue.poll_expired(cx) { - Poll::Ready(Some(Ok(id))) => { - return Poll::Ready(Some(InboundEvent::ReadyBlobsSidecar(id.into_inner()))); - } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(InboundEvent::DelayQueueError( - e, - "blobs_sidecar_queue", - ))); - } - // `Poll::Ready(None)` means that there are no more entries in the delay queue and we - // will continue to get this result until something else is added into the queue. - Poll::Ready(None) | Poll::Pending => (), - } - // Last empty the messages channel. match self.work_reprocessing_rx.poll_recv(cx) { Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))), @@ -312,19 +264,14 @@ pub fn spawn_reprocess_scheduler( gossip_block_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), - blobs_sidecar_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), - queued_blob_sidecars: FnvHashMap::default(), awaiting_attestations_per_root: HashMap::new(), - awaiting_blobs_sidecars_per_root: HashMap::new(), next_attestation: 0, - next_sidecar: 0, early_block_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), - blobs_sidecar_debounce: TimeLatch::default(), }; executor.spawn( @@ -526,39 +473,6 @@ impl ReprocessQueue { self.next_attestation += 1; } - InboundEvent::Msg(UnknownBlobSidecar(queued_blob_sidecar)) => { - if self.blobs_sidecar_delay_queue.len() >= MAXIMUM_QUEUED_BLOB_SIDECARS { - if self.blobs_sidecar_debounce.elapsed() { - error!( - log, - "Blobs sidecar queue is full"; - "queue_size" => MAXIMUM_QUEUED_BLOB_SIDECARS, - "msg" => "check system clock" - ); - } - // Drop the attestation. - return; - } - - let id = QueuedBlobsSidecarId(self.next_sidecar); - - // Register the delay. - let delay_key = self - .blobs_sidecar_delay_queue - .insert(id, QUEUED_BLOBS_SIDECARS_DELAY); - - // Register this sidecar for the corresponding root. - self.awaiting_blobs_sidecars_per_root - .entry(queued_blob_sidecar.blobs_sidecar.message.beacon_block_root) - .or_default() - .push(id); - - // Store the blob sidecar and its info. - self.queued_blob_sidecars - .insert(self.next_sidecar, (queued_blob_sidecar, delay_key)); - - self.next_sidecar += 1; - } InboundEvent::Msg(BlockImported(root)) => { // Unqueue the attestations we have for this root, if any. if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&root) { @@ -603,43 +517,6 @@ impl ReprocessQueue { } } } - // Unqueue the blob sidecars we have for this root, if any. - // TODO: merge the 2 data structures. - if let Some(queued_ids) = self.awaiting_blobs_sidecars_per_root.remove(&root) { - for id in queued_ids { - // metrics::inc_counter( - // &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS, - // ); - - if let Some((work, delay_key)) = self - .queued_blob_sidecars - .remove(&id.0) - .map(|(blobs_sidecar, delay_key)| { - (ReadyWork::BlobsSidecar(blobs_sidecar), delay_key) - }) - { - // Remove the delay. - self.blobs_sidecar_delay_queue.remove(&delay_key); - - // Send the work. - if self.ready_work_tx.try_send(work).is_err() { - error!( - log, - "Failed to send scheduled blob sidecar"; - ); - } - } else { - // There is a mismatch between the blob sidecar ids registered for this - // root and the queued blob sidecars. This should never happen. - error!( - log, - "Unknown queued blob sidecar for block root"; - "block_root" => ?root, - "id" => ?id, - ); - } - } - } } // A block that was queued for later processing is now ready to be processed. InboundEvent::ReadyGossipBlock(ready_block) => { @@ -714,40 +591,6 @@ impl ReprocessQueue { } } } - InboundEvent::ReadyBlobsSidecar(queued_blobs_sidecar_id) => { - // metrics::inc_counter( - // &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS, - // ); - - if let Some((root, work)) = self - .queued_blob_sidecars - .remove(&queued_blobs_sidecar_id.0) - .map(|(blobs_sidecar, _delay_key)| { - ( - blobs_sidecar.blobs_sidecar.message.beacon_block_root, - ReadyWork::BlobsSidecar(blobs_sidecar), - ) - }) - { - if self.ready_work_tx.try_send(work).is_err() { - error!( - log, - "Failed to send scheduled attestation"; - ); - } - - if let Some(queued_blob_sidecars) = - self.awaiting_blobs_sidecars_per_root.get_mut(&root) - { - if let Some(index) = queued_blob_sidecars - .iter() - .position(|&id| id == queued_blobs_sidecar_id) - { - queued_blob_sidecars.swap_remove(index); - } - } - } - } } metrics::set_gauge_vec( diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 592b92b038..5f13e27d7b 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1,4 +1,3 @@ -use crate::beacon_processor::work_reprocessing_queue::QueuedBlobsSidecar; use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::store::Error; @@ -11,7 +10,10 @@ use beacon_chain::{ BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized, ForkChoiceError, GossipVerifiedBlock, }; -use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; +use lighthouse_network::{ + Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource, + SignedBeaconBlockAndBlobsSidecar, +}; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use ssz::Encode; @@ -19,11 +21,10 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; -use types::signed_blobs_sidecar::SignedBlobsSidecar; use types::{ - Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, ProposerSlashing, - SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, - Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, + Attestation, AttesterSlashing, BlobsSidecar, EthSpec, Hash256, IndexedAttestation, + ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, + SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; use super::{ @@ -697,30 +698,15 @@ impl Worker { } #[allow(clippy::too_many_arguments)] - pub fn process_gossip_blob( + pub async fn process_gossip_block_and_blobs_sidecar( self, message_id: MessageId, peer_id: PeerId, - blob: Arc>, - reprocess_tx: Option>>, + peer_client: Client, + block_and_blob: Arc>, seen_timestamp: Duration, ) { - match self.chain.verify_blobs_sidecar_for_gossip(&blob) { - //FIXME(sean) - Ok(verified_sidecar) => { - // Register with validator monitor - // Propagate - // Apply to fork choice - } - Err(error) => self.handle_blobs_verification_failure( - peer_id, - message_id, - reprocess_tx, - error, - blob, - seen_timestamp, - ), - }; + unimplemented!() } /// Process the beacon block received from the gossip network and @@ -2235,82 +2221,7 @@ impl Worker { message_id: MessageId, reprocess_tx: Option>>, error: BlobError, - blobs_sidecar: Arc>, seen_timestamp: Duration, ) { - // TODO: metrics - match &error { - BlobError::FutureSlot { .. } => { - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); - } - BlobError::PastSlot { .. } => { - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); - } - BlobError::BeaconChainError(_e) => { - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); - } - BlobError::BlobOutOfRange { blob_index: _ } => { - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); - } - BlobError::InvalidKZGCommitment => { - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); - } - BlobError::ProposalSignatureInvalid => { - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); - } - BlobError::RepeatSidecar { - proposer: _, - slot: _, - } => { - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); - } - BlobError::UnknownHeadBlock { beacon_block_root } => { - debug!( - self.log, - "Blob sidecar for unknown block"; - "peer_id" => %peer_id, - "block" => ?beacon_block_root - ); - if let Some(sender) = reprocess_tx { - // We don't know the block, get the sync manager to handle the block lookup, and - // send the attestation to be scheduled for re-processing. - self.sync_tx - .send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root)) - .unwrap_or_else(|_| { - warn!( - self.log, - "Failed to send to sync service"; - "msg" => "UnknownBlockHash" - ) - }); - let msg = ReprocessQueueMessage::UnknownBlobSidecar(QueuedBlobsSidecar { - peer_id, - message_id, - blobs_sidecar, - seen_timestamp, - }); - - if sender.try_send(msg).is_err() { - error!( - self.log, - "Failed to send blob sidecar for re-processing"; - ) - } - } else { - // We shouldn't make any further attempts to process this attestation. - // - // Don't downscore the peer since it's not clear if we requested this head - // block from them or not. - self.propagate_validation_result( - message_id, - peer_id, - MessageAcceptance::Ignore, - ); - } - - return; - } - &BlobError::UnknownValidator(_) => todo!(), - } } } diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 24a202c497..cb90813b26 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -236,12 +236,12 @@ impl Router { block, ); } - PubsubMessage::BlobsSidecars(blobs) => { - self.processor.on_blobs_gossip( + PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs) => { + self.processor.on_block_and_blobs_sidecar_gossip( id, peer_id, self.network_globals.client(&peer_id), - blobs, + block_and_blobs, ); } PubsubMessage::VoluntaryExit(exit) => { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index c2cf483d97..dadaf60c1e 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -7,7 +7,7 @@ use crate::sync::manager::RequestId as SyncId; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::rpc::methods::BlobsByRangeRequest; -use lighthouse_network::rpc::*; +use lighthouse_network::{rpc::*, SignedBeaconBlockAndBlobsSidecar}; use lighthouse_network::{ Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response, }; @@ -17,7 +17,6 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::SyncCommitteeMessage; use tokio::sync::mpsc; -use types::signed_blobs_sidecar::SignedBlobsSidecar; use types::{ Attestation, AttesterSlashing, BlobsSidecar, EthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, @@ -295,18 +294,18 @@ impl Processor { )) } - pub fn on_blobs_gossip( + pub fn on_block_and_blobs_sidecar_gossip( &mut self, message_id: MessageId, peer_id: PeerId, peer_client: Client, - blobs: Arc>, + block_and_blobs: Arc>, ) { - self.send_beacon_processor_work(BeaconWorkEvent::gossip_blobs_sidecar( + self.send_beacon_processor_work(BeaconWorkEvent::gossip_block_and_blobs_sidecar( message_id, peer_id, peer_client, - blobs, + block_and_blobs, timestamp_now(), )) } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index cfba40c0bf..c0fbef973f 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -38,7 +38,6 @@ use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use std::time::Duration; -use types::signed_blobs_sidecar::SignedBlobsSidecar; use types::*; /// On-disk database that stores finalized states efficiently. @@ -62,7 +61,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// The hot database also contains all blocks. pub hot_db: Hot, /// LRU cache of deserialized blobs. Updated whenever a blob is loaded. - blob_cache: Mutex>>, + blob_cache: Mutex>>, /// LRU cache of deserialized blocks. Updated whenever a block is loaded. block_cache: Mutex>>, /// Chain spec. @@ -480,11 +479,7 @@ impl, Cold: ItemStore> HotColdDB .key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes()) } - pub fn put_blobs( - &self, - block_root: &Hash256, - blobs: SignedBlobsSidecar, - ) -> Result<(), Error> { + pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar) -> Result<(), Error> { self.hot_db.put_bytes( DBColumn::BeaconBlob.into(), block_root.as_bytes(), @@ -494,7 +489,7 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { + pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { if let Some(blobs) = self.blob_cache.lock().get(block_root) { Ok(Some(blobs.clone())) } else { @@ -502,7 +497,7 @@ impl, Cold: ItemStore> HotColdDB .hot_db .get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { - let ret = SignedBlobsSidecar::from_ssz_bytes(&bytes)?; + let ret = BlobsSidecar::from_ssz_bytes(&bytes)?; self.blob_cache.lock().put(*block_root, ret.clone()); Ok(Some(ret)) } else { @@ -514,7 +509,7 @@ impl, Cold: ItemStore> HotColdDB pub fn blobs_as_kv_store_ops( &self, key: &Hash256, - blobs: &SignedBlobsSidecar, + blobs: &BlobsSidecar, ops: &mut Vec, ) { let db_key = get_key_for_col(DBColumn::BeaconBlob.into(), key.as_bytes()); diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 00e37a18ec..d9041dd636 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -41,7 +41,6 @@ pub use metrics::scrape_for_metrics; use parking_lot::MutexGuard; use std::sync::Arc; use strum::{EnumString, IntoStaticStr}; -use types::signed_blobs_sidecar::SignedBlobsSidecar; pub use types::*; pub type ColumnIter<'a> = Box), Error>> + 'a>; @@ -156,7 +155,7 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), - PutBlobs(Hash256, Arc>), + PutBlobs(Hash256, Arc>), PutStateSummary(Hash256, HotStateSummary), PutStateTemporaryFlag(Hash256), DeleteStateTemporaryFlag(Hash256), diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 81d6e7051b..a6581b0ed4 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -608,7 +608,7 @@ impl BeaconNodeHttpClient { /// Returns `Ok(None)` on a 404 error. pub async fn post_beacon_blobs( &self, - block: &SignedBlobsSidecar, + block: &BlobsSidecar, ) -> Result<(), Error> { let mut path = self.eth_path(V1)?; diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 39ddbf6dee..cc839001dd 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -93,7 +93,6 @@ pub mod sqlite; pub mod blobs_sidecar; pub mod kzg_commitment; pub mod kzg_proof; -pub mod signed_blobs_sidecar; use ethereum_types::{H160, H256}; @@ -166,7 +165,6 @@ pub use crate::signed_beacon_block::{ SignedBlindedBeaconBlock, }; pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader; -pub use crate::signed_blobs_sidecar::SignedBlobsSidecar; pub use crate::signed_contribution_and_proof::SignedContributionAndProof; pub use crate::signed_voluntary_exit::SignedVoluntaryExit; pub use crate::signing_data::{SignedRoot, SigningData}; diff --git a/consensus/types/src/signed_blobs_sidecar.rs b/consensus/types/src/signed_blobs_sidecar.rs deleted file mode 100644 index 677b95bd38..0000000000 --- a/consensus/types/src/signed_blobs_sidecar.rs +++ /dev/null @@ -1,58 +0,0 @@ -use crate::{ - signing_data::SignedRoot, BlobsSidecar, ChainSpec, Domain, EthSpec, Fork, Hash256, PublicKey, - SigningData, -}; -use bls::Signature; -use serde_derive::{Deserialize, Serialize}; -use ssz_derive::{Decode, Encode}; -use tree_hash::TreeHash; -use tree_hash_derive::TreeHash; - -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] -#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq)] -#[serde(bound = "T: EthSpec")] -pub struct SignedBlobsSidecar { - pub message: BlobsSidecar, - pub signature: Signature, -} - -impl SignedBlobsSidecar { - pub fn from_blob(blob: BlobsSidecar, signature: Signature) -> Self { - Self { - message: blob, - signature, - } - } - - /// Verify `self.signature`. - /// - /// If the root of `blob_sidecar.message` is already known it can be passed in via `object_root_opt`. - /// Otherwise, it will be computed locally. - pub fn verify_signature( - &self, - object_root_opt: Option, - pubkey: &PublicKey, - fork: &Fork, - genesis_validators_root: Hash256, - spec: &ChainSpec, - ) -> bool { - let domain = spec.get_domain( - self.message.beacon_block_slot.epoch(T::slots_per_epoch()), - Domain::BlobsSideCar, - fork, - genesis_validators_root, - ); - - let message = if let Some(object_root) = object_root_opt { - SigningData { - object_root, - domain, - } - .tree_hash_root() - } else { - self.message.signing_root(domain) - }; - - self.signature.verify(pubkey, message) - } -} diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 29926f3780..2a50b6d2ea 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -319,284 +319,126 @@ impl BlockService { let proposer_index = self.validator_store.validator_index(&validator_pubkey); let validator_pubkey_ref = &validator_pubkey; - match self.context.eth2_config.spec.fork_name_at_slot::(slot) { - ForkName::Base | ForkName::Altair | ForkName::Merge => { - // Request block from first responsive beacon node. - let block = self - .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let block = match Payload::block_type() { - BlockType::Full => { - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_GET], - ); - beacon_node - .get_validator_blocks::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })? - .data - } - BlockType::Blinded => { - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BLINDED_BEACON_BLOCK_HTTP_GET], - ); - beacon_node - .get_validator_blinded_blocks::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })? - .data - } - }; + // Request block from first responsive beacon node. + let block = self + .beacon_nodes + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { + let block = match Payload::block_type() { + BlockType::Full => { + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_GET], + ); + beacon_node + .get_validator_blocks::( + slot, + randao_reveal_ref, + graffiti.as_ref(), + ) + .await + .map_err(|e| { + BlockError::Recoverable(format!( + "Error from beacon node when producing block: {:?}", + e + )) + })? + .data + } + BlockType::Blinded => { + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BLINDED_BEACON_BLOCK_HTTP_GET], + ); + beacon_node + .get_validator_blinded_blocks::( + slot, + randao_reveal_ref, + graffiti.as_ref(), + ) + .await + .map_err(|e| { + BlockError::Recoverable(format!( + "Error from beacon node when producing block: {:?}", + e + )) + })? + .data + } + }; - if proposer_index != Some(block.proposer_index()) { - return Err(BlockError::Recoverable( - "Proposer index does not match block proposer. Beacon chain re-orged" - .to_string(), - )); - } + if proposer_index != Some(block.proposer_index()) { + return Err(BlockError::Recoverable( + "Proposer index does not match block proposer. Beacon chain re-orged" + .to_string(), + )); + } - Ok::<_, BlockError>(block) - }, - ) - .await?; + Ok::<_, BlockError>(block) + }, + ) + .await?; - let signed_block = self_ref - .validator_store - .sign_block::(*validator_pubkey_ref, block, current_slot) - .await - .map_err(|e| { - BlockError::Recoverable(format!("Unable to sign block: {:?}", e)) - })?; - - // Publish block with first available beacon node. - self.beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async { - match Payload::block_type() { - BlockType::Full => { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_POST], - ); - beacon_node - .post_beacon_blocks(&signed_block) - .await - .map_err(|e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing block: {:?}", - e - )) - })? - } - BlockType::Blinded => { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BLINDED_BEACON_BLOCK_HTTP_POST], - ); - beacon_node - .post_beacon_blinded_blocks(&signed_block) - .await - .map_err(|e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing block: {:?}", - e - )) - })? - } - } - Ok::<_, BlockError>(()) - }, - ) - .await?; - - info!( - log, - "Successfully published block"; - "block_type" => ?Payload::block_type(), - "deposits" => signed_block.message().body().deposits().len(), - "attestations" => signed_block.message().body().attestations().len(), - "graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()), - "slot" => signed_block.slot().as_u64(), - ); - } - ForkName::Capella | ForkName::Eip4844 => { - if matches!(Payload::block_type(), BlockType::Blinded) { - //FIXME(sean) - crit!( - log, - "`--builder-payloads` not yet supported for EIP-4844 fork" - ); - return Ok(()); - } - - // Request block from first responsive beacon node. - let block_and_blobs = self - .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_GET], - ); - let block_and_blobs = beacon_node - .get_validator_blocks_and_blobs::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })? - .data; - - if proposer_index != Some(block_and_blobs.block.proposer_index()) { - return Err(BlockError::Recoverable( - "Proposer index does not match block proposer. Beacon chain re-orged" - .to_string(), - )); - } - - Ok::<_, BlockError>(block_and_blobs) - }, - ) - .await?; - - let blobs_sidecar = BlobsSidecar { - beacon_block_root: block_and_blobs.block.canonical_root(), - beacon_block_slot: block_and_blobs.block.slot(), - blobs: VariableList::from(block_and_blobs.blobs), - kzg_aggregate_proof: block_and_blobs.kzg_aggregate_proof, - }; - - let block = block_and_blobs.block; - let block_publish_future = async { - let signed_block = self_ref - .validator_store - .sign_block::(*validator_pubkey_ref, block, current_slot) - .await - .map_err(|e| { - BlockError::Recoverable(format!("Unable to sign block: {:?}", e)) - })?; - - // Publish block with first available beacon node. - self.beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_POST], - ); - beacon_node - .post_beacon_blocks(&signed_block) - .await - .map_err(|e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing block: {:?}", - e - )) - })?; - Ok::<_, BlockError>(()) - }, - ) - .await?; - - info!( - log, - "Successfully published block"; - "block_type" => ?Payload::block_type(), - "deposits" => signed_block.message().body().deposits().len(), - "attestations" => signed_block.message().body().attestations().len(), - "graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()), - "slot" => signed_block.slot().as_u64(), - ); + let signed_block = self_ref + .validator_store + .sign_block::(*validator_pubkey_ref, block, current_slot) + .await + .map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?; + // Publish block with first available beacon node. + self.beacon_nodes + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async { + match Payload::block_type() { + BlockType::Full => { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_POST], + ); + beacon_node + .post_beacon_blocks(&signed_block) + .await + .map_err(|e| { + BlockError::Irrecoverable(format!( + "Error from beacon node when publishing block: {:?}", + e + )) + })? + } + BlockType::Blinded => { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BLINDED_BEACON_BLOCK_HTTP_POST], + ); + beacon_node + .post_beacon_blinded_blocks(&signed_block) + .await + .map_err(|e| { + BlockError::Irrecoverable(format!( + "Error from beacon node when publishing block: {:?}", + e + )) + })? + } + } Ok::<_, BlockError>(()) - }; + }, + ) + .await?; - let blob_publish_future = async { - let signed_blobs = self_ref - .validator_store - .sign_blobs(*validator_pubkey_ref, blobs_sidecar, current_slot) - .await - .map_err(|e| { - BlockError::Recoverable(format!("Unable to sign blob: {:?}", e)) - })?; - - // Publish block with first available beacon node. - self.beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOB_HTTP_POST], - ); - beacon_node.post_beacon_blobs(&signed_blobs).await.map_err( - |e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing blob: {:?}", - e - )) - }, - )?; - Ok::<_, BlockError>(()) - }, - ) - .await?; - - info!( - log, - "Successfully published blobs"; - "block_type" => ?Payload::block_type(), - "slot" => signed_blobs.message.beacon_block_slot.as_u64(), - "block_root" => ?signed_blobs.message.beacon_block_root, - "blobs_len" => signed_blobs.message.blobs.len(), - ); - - Ok::<_, BlockError>(()) - }; - - let (res_block, res_blob) = tokio::join!(block_publish_future, blob_publish_future); - - res_block?; - res_blob?; - } - } + info!( + log, + "Successfully published block"; + "block_type" => ?Payload::block_type(), + "deposits" => signed_block.message().body().deposits().len(), + "attestations" => signed_block.message().body().attestations().len(), + "graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()), + "slot" => signed_block.slot().as_u64(), + ); Ok(()) } diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 306f22a2fb..692365aece 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -22,9 +22,9 @@ use types::{ AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, BlobsSidecar, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, ExecPayload, Fork, FullPayload, Graffiti, Hash256, Keypair, PublicKeyBytes, SelectionProof, Signature, SignedAggregateAndProof, - SignedBeaconBlock, SignedBlobsSidecar, SignedContributionAndProof, SignedRoot, - SignedValidatorRegistrationData, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, - SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + SignedBeaconBlock, SignedContributionAndProof, SignedRoot, SignedValidatorRegistrationData, + Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, SyncCommitteeMessage, + SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, }; use validator_dir::ValidatorDir; @@ -532,42 +532,6 @@ impl ValidatorStore { } } - pub async fn sign_blobs( - &self, - validator_pubkey: PublicKeyBytes, - blobs_sidecar: BlobsSidecar, - current_slot: Slot, - ) -> Result, Error> { - let slot = blobs_sidecar.beacon_block_slot; - - // Make sure the blob slot is not higher than the current slot to avoid potential attacks. - if slot > current_slot { - warn!( - self.log, - "Not signing blob with slot greater than current slot"; - "blob_slot" => slot.as_u64(), - "current_slot" => current_slot.as_u64() - ); - return Err(Error::GreaterThanCurrentSlot { slot, current_slot }); - } - - let signing_epoch = slot.epoch(E::slots_per_epoch()); - let signing_context = self.signing_context(Domain::BlobsSideCar, signing_epoch); - - metrics::inc_counter_vec(&metrics::SIGNED_BLOBS_TOTAL, &[metrics::SUCCESS]); - - let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; - let signature = signing_method - .get_signature::>( - SignableMessage::BlobsSidecar(&blobs_sidecar), - signing_context, - &self.spec, - &self.task_executor, - ) - .await?; - Ok(SignedBlobsSidecar::from_blob(blobs_sidecar, signature)) - } - pub async fn sign_attestation( &self, validator_pubkey: PublicKeyBytes,