From 7162e5e23b8bc020acf39d62bd29578a7b006924 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 15 Nov 2022 16:43:56 -0500 Subject: [PATCH] add a bunch of blob coupling boiler plate, add a blobs by root request --- beacon_node/beacon_chain/src/beacon_chain.rs | 50 ++++------ .../beacon_chain/src/blob_verification.rs | 94 ++++++++----------- .../beacon_chain/src/block_verification.rs | 56 ++++++----- beacon_node/lighthouse_network/src/lib.rs | 1 - .../src/peer_manager/mod.rs | 3 + .../src/rpc/codec/ssz_snappy.rs | 68 ++++++++++---- .../lighthouse_network/src/rpc/methods.rs | 22 ++++- beacon_node/lighthouse_network/src/rpc/mod.rs | 1 + .../lighthouse_network/src/rpc/outbound.rs | 8 ++ .../lighthouse_network/src/rpc/protocol.rs | 28 ++++-- .../src/rpc/rate_limiter.rs | 13 +++ .../src/service/api_types.rs | 13 ++- .../lighthouse_network/src/service/mod.rs | 15 +++ .../lighthouse_network/src/types/mod.rs | 2 +- .../lighthouse_network/src/types/pubsub.rs | 9 +- .../network/src/beacon_processor/mod.rs | 31 ++++-- .../beacon_processor/worker/gossip_methods.rs | 25 +---- beacon_node/network/src/router/mod.rs | 7 ++ beacon_node/network/src/router/processor.rs | 22 +++-- .../network/src/sync/block_lookups/mod.rs | 4 +- .../src/sync/block_lookups/parent_lookup.rs | 11 ++- .../sync/block_lookups/single_block_lookup.rs | 3 +- beacon_node/network/src/sync/manager.rs | 27 ++---- .../network/src/sync/network_context.rs | 3 + consensus/types/src/signed_block_and_blobs.rs | 29 +++++- 25 files changed, 334 insertions(+), 211 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a7d0fe5c6c..1f882de71d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6,7 +6,6 @@ use crate::attestation_verification::{ use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; -use crate::blob_verification::{BlobError, VerifiedBlobsSidecar}; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::{ check_block_is_finalized_descendant, check_block_relevancy, get_block_root, @@ -103,11 +102,12 @@ use task_executor::{ShutdownReason, TaskExecutor}; use tree_hash::TreeHash; use types::beacon_state::CloneConfig; use types::*; +use types::signed_block_and_blobs::BlockMaybeBlobs; pub type ForkChoiceError = fork_choice::Error; /// Alias to appease clippy. -type HashBlockTuple = (Hash256, Arc>); +type HashBlockTuple = (Hash256, BlockMaybeBlobs); /// The time-out before failure during an operation to take a read/write RwLock on the block /// processing cache. @@ -1784,23 +1784,6 @@ impl BeaconChain { }) } - /// Accepts some `BlobsSidecar` received over from the network and attempts to verify it, - /// returning `Ok(_)` if it is valid to be (re)broadcast on the gossip network. - pub fn verify_blobs_sidecar_for_gossip<'a>( - &self, - blobs_sidecar: &'a BlobsSidecar, - ) -> Result, BlobError> { - metrics::inc_counter(&metrics::BLOBS_SIDECAR_PROCESSING_REQUESTS); - let _timer = metrics::start_timer(&metrics::BLOBS_SIDECAR_GOSSIP_VERIFICATION_TIMES); - VerifiedBlobsSidecar::verify(blobs_sidecar, self).map(|v| { - if let Some(_event_handler) = self.event_handler.as_ref() { - // TODO: Handle sse events - } - metrics::inc_counter(&metrics::BLOBS_SIDECAR_PROCESSING_SUCCESSES); - v - }) - } - /// Accepts some attestation-type object and attempts to verify it in the context of fork /// choice. If it is valid it is applied to `self.fork_choice`. /// @@ -2215,7 +2198,7 @@ impl BeaconChain { /// This method is potentially long-running and should not run on the core executor. pub fn filter_chain_segment( self: &Arc, - chain_segment: Vec>>, + chain_segment: Vec>, ) -> Result>, ChainSegmentResult> { // This function will never import any blocks. let imported_blocks = 0; @@ -2321,7 +2304,7 @@ impl BeaconChain { /// `Self::process_block`. pub async fn process_chain_segment( self: &Arc, - chain_segment: Vec>>, + chain_segment: Vec>, count_unrealized: CountUnrealized, ) -> ChainSegmentResult { let mut imported_blocks = 0; @@ -2343,7 +2326,10 @@ impl BeaconChain { } }; - while let Some((_root, block)) = filtered_chain_segment.first() { + while let Some((_root, block_wrapper)) = filtered_chain_segment.first() { + + let block: &SignedBeaconBlock = block_wrapper.block(); + // Determine the epoch of the first block in the remaining segment. let start_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); @@ -2354,7 +2340,7 @@ impl BeaconChain { let last_index = filtered_chain_segment .iter() .position(|(_root, block)| { - block.slot().epoch(T::EthSpec::slots_per_epoch()) > start_epoch + block.block().slot().epoch(T::EthSpec::slots_per_epoch()) > start_epoch }) .unwrap_or(filtered_chain_segment.len()); @@ -2420,17 +2406,17 @@ impl BeaconChain { /// Returns an `Err` if the given block was invalid, or an error was encountered during pub async fn verify_block_for_gossip( self: &Arc, - block: Arc>, + block_wrapper: BlockMaybeBlobs, ) -> Result, BlockError> { let chain = self.clone(); self.task_executor .clone() .spawn_blocking_handle( move || { - let slot = block.slot(); - let graffiti_string = block.message().body().graffiti().as_utf8_lossy(); + let slot = block_wrapper.block().slot(); + let graffiti_string = block_wrapper.block().message().body().graffiti().as_utf8_lossy(); - match GossipVerifiedBlock::new(block, &chain) { + match GossipVerifiedBlock::new(block_wrapper, &chain) { Ok(verified) => { debug!( chain.log, @@ -2486,9 +2472,6 @@ impl BeaconChain { // Increment the Prometheus counter for block processing requests. metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS); - // Clone the block so we can provide it to the event handler. - let block = unverified_block.block().clone(); - // A small closure to group the verification and import errors. let chain = self.clone(); let import_block = async move { @@ -2499,6 +2482,8 @@ impl BeaconChain { .await }; + let slot = unverified_block.block().slot(); + // Verify and import the block. match import_block.await { // The block was successfully verified and imported. Yay. @@ -2507,7 +2492,7 @@ impl BeaconChain { self.log, "Beacon block imported"; "block_root" => ?block_root, - "block_slot" => %block.slot(), + "block_slot" => slot, ); // Increment the Prometheus counter for block processing successes. @@ -2633,7 +2618,7 @@ impl BeaconChain { #[allow(clippy::too_many_arguments)] fn import_block( &self, - signed_block: Arc>, + block_wrapper: BlockMaybeBlobs, block_root: Hash256, mut state: BeaconState, confirmed_state_roots: Vec, @@ -2642,6 +2627,7 @@ impl BeaconChain { parent_block: SignedBlindedBeaconBlock, parent_eth1_finalization_data: Eth1FinalizationData, ) -> Result> { + let signed_block = block_wrapper.block(); let current_slot = self.slot()?; let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index d3e0d2a17d..0ca1785d02 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use derivative::Derivative; use slot_clock::SlotClock; @@ -79,58 +80,45 @@ impl From for BlobError { } } -/// A wrapper around a `BlobsSidecar` that indicates it has been verified w.r.t the corresponding -/// `SignedBeaconBlock`. -#[derive(Derivative)] -#[derivative(Debug(bound = "T: BeaconChainTypes"))] -pub struct VerifiedBlobsSidecar<'a, T: BeaconChainTypes> { - pub blob_sidecar: &'a BlobsSidecar, -} - -impl<'a, T: BeaconChainTypes> VerifiedBlobsSidecar<'a, T> { - pub fn verify( - blob_sidecar: &'a BlobsSidecar, - chain: &BeaconChain, - ) -> Result { - let blob_slot = blob_sidecar.beacon_block_slot; - // Do not gossip or process blobs from future or past slots. - let latest_permissible_slot = chain - .slot_clock - .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) - .ok_or(BeaconChainError::UnableToReadSlot)?; - if blob_slot > latest_permissible_slot { - return Err(BlobError::FutureSlot { - message_slot: latest_permissible_slot, - latest_permissible_slot: blob_slot, - }); - } - - let earliest_permissible_slot = chain - .slot_clock - .now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) - .ok_or(BeaconChainError::UnableToReadSlot)?; - if blob_slot > earliest_permissible_slot { - return Err(BlobError::PastSlot { - message_slot: earliest_permissible_slot, - earliest_permissible_slot: blob_slot, - }); - } - - // Verify that blobs are properly formatted - //TODO: add the check while constructing a Blob type from bytes instead of after - for (i, blob) in blob_sidecar.blobs.iter().enumerate() { - if blob.iter().any(|b| *b >= *BLS_MODULUS) { - return Err(BlobError::BlobOutOfRange { blob_index: i }); - } - } - - // Verify that the KZG proof is a valid G1 point - if PublicKey::deserialize(&blob_sidecar.kzg_aggregate_proof.0).is_err() { - return Err(BlobError::InvalidKZGCommitment); - } - - // TODO: Check that we have not already received a sidecar with a valid signature for this slot. - - Ok(Self { blob_sidecar }) +pub fn validate_blob_for_gossip(blob_sidecar: &BlobsSidecar, chain: &Arc>) -> Result<(), BlobError>{ + let blob_slot = blob_sidecar.beacon_block_slot; + // Do not gossip or process blobs from future or past slots. + let latest_permissible_slot = chain + .slot_clock + .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) + .ok_or(BeaconChainError::UnableToReadSlot)?; + if blob_slot > latest_permissible_slot { + return Err(BlobError::FutureSlot { + message_slot: latest_permissible_slot, + latest_permissible_slot: blob_slot, + }); } + + let earliest_permissible_slot = chain + .slot_clock + .now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) + .ok_or(BeaconChainError::UnableToReadSlot)?; + if blob_slot > earliest_permissible_slot { + return Err(BlobError::PastSlot { + message_slot: earliest_permissible_slot, + earliest_permissible_slot: blob_slot, + }); + } + + // Verify that blobs are properly formatted + //TODO: add the check while constructing a Blob type from bytes instead of after + for (i, blob) in blob_sidecar.blobs.iter().enumerate() { + if blob.iter().any(|b| *b >= *BLS_MODULUS) { + return Err(BlobError::BlobOutOfRange { blob_index: i }); + } + } + + // Verify that the KZG proof is a valid G1 point + if PublicKey::deserialize(&blob_sidecar.kzg_aggregate_proof.0).is_err() { + return Err(BlobError::InvalidKZGCommitment); + } + + // TODO: `validate_blobs_sidecar` + Ok(()) } + diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index f40f888133..15c4c0f528 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -83,12 +83,14 @@ use std::time::Duration; use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp}; use task_executor::JoinHandle; use tree_hash::TreeHash; -use types::ExecPayload; +use types::{BlobsSidecar, ExecPayload, SignedBeaconBlockAndBlobsSidecar}; use types::{ BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, CloneConfig, Epoch, EthSpec, ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; +use types::signed_block_and_blobs::BlockMaybeBlobs; +use crate::blob_verification::validate_blob_for_gossip; pub const POS_PANDA_BANNER: &str = r#" ,,, ,,, ,,, ,,, @@ -135,7 +137,7 @@ pub enum BlockError { /// /// It's unclear if this block is valid, but it cannot be processed without already knowing /// its parent. - ParentUnknown(Arc>), + ParentUnknown(BlockMaybeBlobs), /// The block skips too many slots and is a DoS risk. TooManySkippedSlots { parent_slot: Slot, block_slot: Slot }, /// The block slot is greater than the present slot. @@ -524,7 +526,7 @@ fn process_block_slash_info( /// The given `chain_segment` must contain only blocks from the same epoch, otherwise an error /// will be returned. pub fn signature_verify_chain_segment( - mut chain_segment: Vec<(Hash256, Arc>)>, + mut chain_segment: Vec<(Hash256, BlockMaybeBlobs)>, chain: &BeaconChain, ) -> Result>, BlockError> { if chain_segment.is_empty() { @@ -589,7 +591,7 @@ pub fn signature_verify_chain_segment( #[derive(Derivative)] #[derivative(Debug(bound = "T: BeaconChainTypes"))] pub struct GossipVerifiedBlock { - pub block: Arc>, + pub block: BlockMaybeBlobs, pub block_root: Hash256, parent: Option>, consensus_context: ConsensusContext, @@ -598,7 +600,7 @@ pub struct GossipVerifiedBlock { /// A wrapper around a `SignedBeaconBlock` that indicates that all signatures (except the deposit /// signatures) have been verified. pub struct SignatureVerifiedBlock { - block: Arc>, + block: BlockMaybeBlobs, block_root: Hash256, parent: Option>, consensus_context: ConsensusContext, @@ -620,7 +622,7 @@ type PayloadVerificationHandle = /// due to finality or some other event. A `ExecutionPendingBlock` should be imported into the /// `BeaconChain` immediately after it is instantiated. pub struct ExecutionPendingBlock { - pub block: Arc>, + pub block: BlockMaybeBlobs, pub block_root: Hash256, pub state: BeaconState, pub parent_block: SignedBeaconBlock>, @@ -665,7 +667,7 @@ impl GossipVerifiedBlock { /// /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn new( - block: Arc>, + block: BlockMaybeBlobs, chain: &BeaconChain, ) -> Result> { // If the block is valid for gossip we don't supply it to the slasher here because @@ -680,9 +682,10 @@ impl GossipVerifiedBlock { /// As for new, but doesn't pass the block to the slasher. fn new_without_slasher_checks( - block: Arc>, + block_wrapper: BlockMaybeBlobs, chain: &BeaconChain, ) -> Result> { + let block = block_wrapper.block(); // Ensure the block is the correct structure for the fork at `block.slot()`. block .fork_name(&chain.spec) @@ -876,13 +879,17 @@ impl GossipVerifiedBlock { // Validate the block's execution_payload (if any). validate_execution_payload_for_gossip(&parent_block, block.message(), chain)?; + if let Some(blobs_sidecar) = block_wrapper.blobs() { + validate_blob_for_gossip(blobs_sidecar, chain)?; + } + // Having checked the proposer index and the block root we can cache them. let consensus_context = ConsensusContext::new(block.slot()) .set_current_block_root(block_root) .set_proposer_index(block.message().proposer_index()); Ok(Self { - block, + block: block_wrapper, block_root, parent, consensus_context, @@ -917,7 +924,7 @@ impl SignatureVerifiedBlock { /// /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn new( - block: Arc>, + block: BlockMaybeBlobs, block_root: Hash256, chain: &BeaconChain, ) -> Result> { @@ -963,7 +970,7 @@ impl SignatureVerifiedBlock { /// As for `new` above but producing `BlockSlashInfo`. pub fn check_slashable( - block: Arc>, + block: BlockMaybeBlobs, block_root: Hash256, chain: &BeaconChain, ) -> Result>> { @@ -1057,7 +1064,7 @@ impl IntoExecutionPendingBlock for SignatureVerifiedBloc } } -impl IntoExecutionPendingBlock for Arc> { +impl IntoExecutionPendingBlock for BlockMaybeBlobs { /// Verifies the `SignedBeaconBlock` by first transforming it into a `SignatureVerifiedBlock` /// and then using that implementation of `IntoExecutionPendingBlock` to complete verification. fn into_execution_pending_block_slashable( @@ -1074,7 +1081,10 @@ impl IntoExecutionPendingBlock for Arc &SignedBeaconBlock { - self + match self { + Self::Block(block) => block, + Self::BlockAndBlobs(block) => &block.beacon_block, + } } } @@ -1087,12 +1097,14 @@ impl ExecutionPendingBlock { /// /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn from_signature_verified_components( - block: Arc>, + block_wrapper: BlockMaybeBlobs, block_root: Hash256, parent: PreProcessingSnapshot, mut consensus_context: ConsensusContext, chain: &Arc>, ) -> Result> { + let block = block_wrapper.block(); + if let Some(parent) = chain .canonical_head .fork_choice_read_lock() @@ -1116,7 +1128,7 @@ impl ExecutionPendingBlock { // because it will revert finalization. Note that the finalized block is stored in fork // choice, so we will not reject any child of the finalized block (this is relevant during // genesis). - return Err(BlockError::ParentUnknown(block)); + return Err(BlockError::ParentUnknown(block_wrapper)); } // Reject any block that exceeds our limit on skipped slots. @@ -1532,7 +1544,8 @@ pub fn check_block_is_finalized_descendant( block_parent_root: block.parent_root(), }) } else { - Err(BlockError::ParentUnknown(block.clone())) + //FIXME(sean) does this matter if it only returns a block? + Err(BlockError::ParentUnknown(BlockMaybeBlobs::Block(block.clone()))) } } } @@ -1624,15 +1637,16 @@ fn verify_parent_block_is_known( #[allow(clippy::type_complexity)] fn load_parent( block_root: Hash256, - block: Arc>, + block_wrapper: BlockMaybeBlobs, chain: &BeaconChain, ) -> Result< ( PreProcessingSnapshot, - Arc>, + BlockMaybeBlobs, ), BlockError, > { + let block = block_wrapper.block(); let spec = &chain.spec; // Reject any block if its parent is not known to fork choice. @@ -1650,7 +1664,7 @@ fn load_parent( .fork_choice_read_lock() .contains_block(&block.parent_root()) { - return Err(BlockError::ParentUnknown(block)); + return Err(BlockError::ParentUnknown(block_wrapper)); } let block_delay = chain @@ -1689,7 +1703,7 @@ fn load_parent( "block_delay" => ?block_delay, ); } - Ok((snapshot, block)) + Ok((snapshot, block_wrapper)) } else { // Load the blocks parent block from the database, returning invalid if that block is not // found. @@ -1736,7 +1750,7 @@ fn load_parent( pre_state: parent_state, beacon_state_root: Some(parent_state_root), }, - block, + block_wrapper, )) }; diff --git a/beacon_node/lighthouse_network/src/lib.rs b/beacon_node/lighthouse_network/src/lib.rs index d7733f7cd3..be4da809cb 100644 --- a/beacon_node/lighthouse_network/src/lib.rs +++ b/beacon_node/lighthouse_network/src/lib.rs @@ -15,7 +15,6 @@ pub mod peer_manager; pub mod rpc; pub mod types; -pub use crate::types::SignedBeaconBlockAndBlobsSidecar; pub use config::gossip_max_size; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 1029204ae6..0ad1264d62 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -502,6 +502,7 @@ impl PeerManager { Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, + Protocol::BlobsByRoot => PeerAction::MidToleranceError, Protocol::Goodbye => PeerAction::LowToleranceError, Protocol::MetaData => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, @@ -518,6 +519,7 @@ impl PeerManager { Protocol::BlocksByRange => return, Protocol::BlocksByRoot => return, Protocol::BlobsByRange => return, + Protocol::BlobsByRoot => return, Protocol::Goodbye => return, Protocol::MetaData => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, @@ -534,6 +536,7 @@ impl PeerManager { Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, + Protocol::BlobsByRoot => PeerAction::MidToleranceError, Protocol::Goodbye => return, Protocol::MetaData => return, Protocol::Status => return, diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index d016d85f74..f6b99881b0 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -3,7 +3,7 @@ use crate::rpc::{ protocol::{Encoding, Protocol, ProtocolId, RPCError, Version, ERROR_TYPE_MAX, ERROR_TYPE_MIN}, }; use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse}; -use crate::{rpc::methods::*, EnrSyncCommitteeBitfield, SignedBeaconBlockAndBlobsSidecar}; +use crate::{rpc::methods::*, EnrSyncCommitteeBitfield}; use libp2p::bytes::BytesMut; use snap::read::FrameDecoder; use snap::write::FrameEncoder; @@ -15,11 +15,7 @@ use std::io::{Read, Write}; use std::marker::PhantomData; use std::sync::Arc; use tokio_util::codec::{Decoder, Encoder}; -use types::{ - BlobsSidecar, EthSpec, ForkContext, ForkName, SignedBeaconBlock, SignedBeaconBlockAltair, - SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockEip4844, - SignedBeaconBlockMerge, -}; +use types::{BlobsSidecar, EthSpec, ForkContext, ForkName, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockEip4844, SignedBeaconBlockMerge}; use unsigned_varint::codec::Uvi; const CONTEXT_BYTES_LEN: usize = 4; @@ -72,6 +68,7 @@ impl Encoder> for SSZSnappyInboundCodec< RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), RPCResponse::BlobsByRange(res) => res.as_ssz_bytes(), + RPCResponse::BlobsByRoot(res) => res.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => // Encode the correct version of the MetaData response based on the negotiated version. @@ -231,6 +228,7 @@ impl Encoder> for SSZSnappyOutboundCodec< OutboundRequest::BlocksByRange(req) => req.as_ssz_bytes(), OutboundRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(), OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(), + OutboundRequest::BlobsByRoot(req) => req.block_roots.as_ssz_bytes(), OutboundRequest::Ping(req) => req.as_ssz_bytes(), OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode }; @@ -313,7 +311,8 @@ impl Decoder for SSZSnappyOutboundCodec { let _read_bytes = src.split_to(n as usize); match self.protocol.version { - Version::V1 => handle_v1_response(self.protocol.message_name, &decoded_buffer), + Version::V1 => handle_v1_response(self.protocol.message_name, &decoded_buffer, + &mut self.fork_name, ), Version::V2 => handle_v2_response( self.protocol.message_name, &decoded_buffer, @@ -483,6 +482,11 @@ fn handle_v1_request( Protocol::BlobsByRange => Ok(Some(InboundRequest::BlobsByRange( BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?, ))), + Protocol::BlobsByRoot => Ok(Some(InboundRequest::BlobsByRoot( + BlobsByRootRequest{ + block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, + }, + ))), Protocol::Ping => Ok(Some(InboundRequest::Ping(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), @@ -540,6 +544,7 @@ fn handle_v2_request( fn handle_v1_response( protocol: Protocol, decoded_buffer: &[u8], + fork_name: &mut Option, ) -> Result>, RPCError> { match protocol { Protocol::Status => Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes( @@ -555,7 +560,40 @@ fn handle_v1_response( Protocol::BlocksByRoot => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), )))), - Protocol::BlobsByRange => Err(RPCError::InvalidData("blobs by range via v1".to_string())), + Protocol::BlobsByRange => { + let fork_name = fork_name.take().ok_or_else(|| { + RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + format!("No context bytes provided for {} response", protocol), + ) + })?; + match fork_name { + ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRange(Arc::new( + SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(decoded_buffer)?, + )))), + _ => Err(RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + "Invalid forkname for blobsbyrange".to_string(), + )), + } + }, + Protocol::BlobsByRoot => { + let fork_name = fork_name.take().ok_or_else(|| { + RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + format!("No context bytes provided for {} response", protocol), + ) + })?; + match fork_name { + ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRoot(Arc::new( + SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(decoded_buffer)?, + )))), + _ => Err(RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + "Invalid forkname for blobsbyroot".to_string(), + )), + } + }, Protocol::Ping => Ok(Some(RPCResponse::Pong(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), @@ -640,15 +678,8 @@ fn handle_v2_response( )?), )))), }, - Protocol::BlobsByRange => match fork_name { - ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRange(Arc::new( - SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(decoded_buffer)?, - )))), - _ => Err(RPCError::ErrorResponse( - RPCResponseErrorCode::InvalidRequest, - "Invalid forkname for blobsbyrange".to_string(), - )), - }, + Protocol::BlobsByRange => Err(RPCError::InvalidData("blobs by range via v2".to_string())), + Protocol::BlobsByRoot => Err(RPCError::InvalidData("blobs by range via v2".to_string())), _ => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, "Invalid v2 request".to_string(), @@ -917,6 +948,9 @@ mod tests { OutboundRequest::BlobsByRange(blbrange) => { assert_eq!(decoded, InboundRequest::BlobsByRange(blbrange)) } + OutboundRequest::BlobsByRoot(blbroot) => { + assert_eq!(decoded, InboundRequest::BlobsByRoot(bbroot)) + } OutboundRequest::Ping(ping) => { assert_eq!(decoded, InboundRequest::Ping(ping)) } diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 49dc977ff1..65e0293b86 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -14,7 +14,7 @@ use strum::IntoStaticStr; use superstruct::superstruct; use types::blobs_sidecar::BlobsSidecar; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; -use crate::SignedBeaconBlockAndBlobsSidecar; +use types::SignedBeaconBlockAndBlobsSidecar; /// Maximum number of blocks in a single request. pub type MaxRequestBlocks = U1024; @@ -243,6 +243,13 @@ pub struct BlocksByRootRequest { pub block_roots: VariableList, } +/// Request a number of beacon blocks and blobs from a peer. +#[derive(Clone, Debug, PartialEq)] +pub struct BlobsByRootRequest { + /// The list of beacon block roots being requested. + pub block_roots: VariableList, +} + /* RPC Handling and Grouping */ // Collection of enums and structs used by the Codecs to encode/decode RPC messages @@ -261,6 +268,9 @@ pub enum RPCResponse { /// A response to a get BLOBS_BY_RANGE request BlobsByRange(Arc>), + /// A response to a get BLOBS_BY_ROOT request. + BlobsByRoot(Arc>), + /// A PONG response to a PING request. Pong(Ping), @@ -279,6 +289,9 @@ pub enum ResponseTermination { /// Blobs by range stream termination. BlobsByRange, + + /// Blobs by root stream termination. + BlobsByRoot, } /// The structured response containing a result/code indicating success or failure @@ -341,6 +354,7 @@ impl RPCCodedResponse { RPCResponse::BlocksByRange(_) => true, RPCResponse::BlocksByRoot(_) => true, RPCResponse::BlobsByRange(_) => true, + RPCResponse::BlobsByRoot(_) => true, RPCResponse::Pong(_) => false, RPCResponse::MetaData(_) => false, }, @@ -376,6 +390,7 @@ impl RPCResponse { RPCResponse::BlocksByRange(_) => Protocol::BlocksByRange, RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, RPCResponse::BlobsByRange(_) => Protocol::BlobsByRange, + RPCResponse::BlobsByRoot(_) => Protocol::BlobsByRoot, RPCResponse::Pong(_) => Protocol::Ping, RPCResponse::MetaData(_) => Protocol::MetaData, } @@ -412,7 +427,10 @@ impl std::fmt::Display for RPCResponse { write!(f, "BlocksByRoot: Block slot: {}", block.slot()) } RPCResponse::BlobsByRange(blob) => { - write!(f, "BlobsByRange: Blob slot: {}", blob.beacon_block_slot) + write!(f, "BlobsByRange: Blob slot: {}", blob.blobs_sidecar.beacon_block_slot) + } + RPCResponse::BlobsByRoot(blob) => { + write!(f, "BlobsByRoot: Blob slot: {}", blob.blobs_sidecar.beacon_block_slot) } RPCResponse::Pong(ping) => write!(f, "Pong: {}", ping.data), RPCResponse::MetaData(metadata) => write!(f, "Metadata: {}", metadata.seq_number()), diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 75e78b0b32..1ccb4b4313 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -300,6 +300,7 @@ where ResponseTermination::BlocksByRange => Protocol::BlocksByRange, ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, ResponseTermination::BlobsByRange => Protocol::BlobsByRange, + ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot, }, ), }, diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index a2029fd24c..0de0eddea5 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -39,6 +39,7 @@ pub enum OutboundRequest { BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), BlobsByRange(BlobsByRangeRequest), + BlobsByRoot(BlobsByRootRequest), Ping(Ping), MetaData(PhantomData), } @@ -81,6 +82,9 @@ impl OutboundRequest { Version::V1, Encoding::SSZSnappy, )], + OutboundRequest::BlobsByRoot(_) => vec![ + ProtocolId::new(Protocol::BlobsByRoot, Version::V1, Encoding::SSZSnappy), + ], OutboundRequest::Ping(_) => vec![ProtocolId::new( Protocol::Ping, Version::V1, @@ -103,6 +107,7 @@ impl OutboundRequest { OutboundRequest::BlocksByRange(req) => req.count, OutboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64, OutboundRequest::BlobsByRange(req) => req.count, + OutboundRequest::BlobsByRoot(req) => req.block_roots.len() as u64, OutboundRequest::Ping(_) => 1, OutboundRequest::MetaData(_) => 1, } @@ -116,6 +121,7 @@ impl OutboundRequest { OutboundRequest::BlocksByRange(_) => Protocol::BlocksByRange, OutboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, OutboundRequest::BlobsByRange(_) => Protocol::BlobsByRange, + OutboundRequest::BlobsByRoot(_) => Protocol::BlobsByRoot, OutboundRequest::Ping(_) => Protocol::Ping, OutboundRequest::MetaData(_) => Protocol::MetaData, } @@ -130,6 +136,7 @@ impl OutboundRequest { OutboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, OutboundRequest::BlobsByRange(_) => ResponseTermination::BlobsByRange, + OutboundRequest::BlobsByRoot(_) => ResponseTermination::BlobsByRoot, OutboundRequest::Status(_) => unreachable!(), OutboundRequest::Goodbye(_) => unreachable!(), OutboundRequest::Ping(_) => unreachable!(), @@ -186,6 +193,7 @@ impl std::fmt::Display for OutboundRequest { OutboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), OutboundRequest::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req), + OutboundRequest::BlobsByRoot(req) => write!(f, "Blobs by root: {:?}", req), OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), OutboundRequest::MetaData(_) => write!(f, "MetaData request"), } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 8511d26208..ed8260e3bd 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -107,12 +107,6 @@ lazy_static! { .as_ssz_bytes() .len(); - pub static ref BLOBS_SIDECAR_MIN: usize = BlobsSidecar::::empty() - .as_ssz_bytes() - .len(); - - pub static ref BLOBS_SIDECAR_MAX: usize = *BLOBS_SIDECAR_MIN // Max size of variable length `blobs` field - + (MainnetEthSpec::max_blobs_per_block() * as Encode>::ssz_fixed_len()); } /// The maximum bytes that can be sent across the RPC pre-merge. @@ -181,6 +175,8 @@ pub enum Protocol { BlocksByRoot, /// The `BlobsByRange` protocol name. BlobsByRange, + /// The `BlobsByRoot` protocol name. + BlobsByRoot, /// The `Ping` protocol name. Ping, /// The `MetaData` protocol name. @@ -210,6 +206,7 @@ impl std::fmt::Display for Protocol { Protocol::BlocksByRange => "beacon_blocks_by_range", Protocol::BlocksByRoot => "beacon_blocks_by_root", Protocol::BlobsByRange => "blobs_sidecars_by_range", + Protocol::BlobsByRoot => "beacon_block_and_blobs_sidecar_by_root", Protocol::Ping => "ping", Protocol::MetaData => "metadata", }; @@ -322,6 +319,9 @@ impl ProtocolId { ::ssz_fixed_len(), ::ssz_fixed_len(), ), + Protocol::BlobsByRoot => { + RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX) + } Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -340,7 +340,11 @@ impl ProtocolId { Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), - Protocol::BlobsByRange => RpcLimits::new(*BLOBS_SIDECAR_MIN, *BLOBS_SIDECAR_MAX), + + //FIXME(sean) add blob sizes + Protocol::BlobsByRange => rpc_block_limits_by_fork(fork_context.current_fork()), + Protocol::BlobsByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), + Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -455,6 +459,7 @@ pub enum InboundRequest { BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), BlobsByRange(BlobsByRangeRequest), + BlobsByRoot(BlobsByRootRequest), Ping(Ping), MetaData(PhantomData), } @@ -499,6 +504,11 @@ impl InboundRequest { Version::V1, Encoding::SSZSnappy, )], + InboundRequest::BlobsByRoot(_) => vec![ProtocolId::new( + Protocol::BlobsByRoot, + Version::V1, + Encoding::SSZSnappy, + )], InboundRequest::Ping(_) => vec![ProtocolId::new( Protocol::Ping, Version::V1, @@ -521,6 +531,7 @@ impl InboundRequest { InboundRequest::BlocksByRange(req) => req.count, InboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64, InboundRequest::BlobsByRange(req) => req.count, + InboundRequest::BlobsByRoot(req) => req.block_roots.len() as u64, InboundRequest::Ping(_) => 1, InboundRequest::MetaData(_) => 1, } @@ -534,6 +545,7 @@ impl InboundRequest { InboundRequest::BlocksByRange(_) => Protocol::BlocksByRange, InboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, InboundRequest::BlobsByRange(_) => Protocol::BlobsByRange, + InboundRequest::BlobsByRoot(_) => Protocol::BlobsByRoot, InboundRequest::Ping(_) => Protocol::Ping, InboundRequest::MetaData(_) => Protocol::MetaData, } @@ -548,6 +560,7 @@ impl InboundRequest { InboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, InboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, InboundRequest::BlobsByRange(_) => ResponseTermination::BlobsByRange, + InboundRequest::BlobsByRoot(_) => ResponseTermination::BlobsByRoot, InboundRequest::Status(_) => unreachable!(), InboundRequest::Goodbye(_) => unreachable!(), InboundRequest::Ping(_) => unreachable!(), @@ -654,6 +667,7 @@ impl std::fmt::Display for InboundRequest { InboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), InboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), InboundRequest::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req), + InboundRequest::BlobsByRoot(req) => write!(f, "Blobs by root: {:?}", req), InboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), InboundRequest::MetaData(_) => write!(f, "MetaData request"), } diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 6aa91aab6b..21f4dca7b5 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -75,6 +75,8 @@ pub struct RPCRateLimiter { bbroots_rl: Limiter, /// BlobsByRange rate limiter. blbrange_rl: Limiter, + /// BlobsByRoot rate limiter. + blbroot_rl: Limiter, } /// Error type for non conformant requests @@ -102,6 +104,8 @@ pub struct RPCRateLimiterBuilder { bbroots_quota: Option, /// Quota for the BlobsByRange protocol. blbrange_quota: Option, + /// Quota for the BlobsByRoot protocol. + blbroot_quota: Option, } impl RPCRateLimiterBuilder { @@ -121,6 +125,7 @@ impl RPCRateLimiterBuilder { Protocol::BlocksByRange => self.bbrange_quota = q, Protocol::BlocksByRoot => self.bbroots_quota = q, Protocol::BlobsByRange => self.blbrange_quota = q, + Protocol::BlobsByRoot => self.blbroot_quota = q, } self } @@ -165,6 +170,10 @@ impl RPCRateLimiterBuilder { .blbrange_quota .ok_or("BlobsByRange quota not specified")?; + let blbroots_quota = self + .blbroot_quota + .ok_or("BlobsByRoot quota not specified")?; + // create the rate limiters let ping_rl = Limiter::from_quota(ping_quota)?; let metadata_rl = Limiter::from_quota(metadata_quota)?; @@ -173,6 +182,7 @@ impl RPCRateLimiterBuilder { let bbroots_rl = Limiter::from_quota(bbroots_quota)?; let bbrange_rl = Limiter::from_quota(bbrange_quota)?; let blbrange_rl = Limiter::from_quota(blbrange_quota)?; + let blbroot_rl = Limiter::from_quota(blbroots_quota)?; // check for peers to prune every 30 seconds, starting in 30 seconds let prune_every = tokio::time::Duration::from_secs(30); @@ -187,6 +197,7 @@ impl RPCRateLimiterBuilder { bbroots_rl, bbrange_rl, blbrange_rl, + blbroot_rl, init_time: Instant::now(), }) } @@ -211,6 +222,7 @@ impl RPCRateLimiter { Protocol::BlocksByRange => &mut self.bbrange_rl, Protocol::BlocksByRoot => &mut self.bbroots_rl, Protocol::BlobsByRange => &mut self.blbrange_rl, + Protocol::BlobsByRoot => &mut self.blbroot_rl, }; check(limiter) } @@ -224,6 +236,7 @@ impl RPCRateLimiter { self.bbrange_rl.prune(time_since_start); self.bbroots_rl.prune(time_since_start); self.blbrange_rl.prune(time_since_start); + self.blbroot_rl.prune(time_since_start); } } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 190b86f0a6..8dae3e25e1 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use libp2p::core::connection::ConnectionId; use types::{BlobsSidecar, EthSpec, SignedBeaconBlock}; -use crate::rpc::methods::BlobsByRangeRequest; +use crate::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use crate::rpc::{ methods::{ BlocksByRangeRequest, BlocksByRootRequest, OldBlocksByRangeRequest, RPCCodedResponse, @@ -11,7 +11,7 @@ use crate::rpc::{ }, OutboundRequest, SubstreamId, }; -use crate::SignedBeaconBlockAndBlobsSidecar; +use types::SignedBeaconBlockAndBlobsSidecar; /// Identifier of requests sent by a peer. pub type PeerRequestId = (ConnectionId, SubstreamId); @@ -38,6 +38,8 @@ pub enum Request { BlobsByRange(BlobsByRangeRequest), /// A request blocks root request. BlocksByRoot(BlocksByRootRequest), + /// A request blobs root request. + BlobsByRoot(BlobsByRootRequest), } impl std::convert::From for OutboundRequest { @@ -52,6 +54,7 @@ impl std::convert::From for OutboundRequest { }) } Request::BlobsByRange(r) => OutboundRequest::BlobsByRange(r), + Request::BlobsByRoot(r) => OutboundRequest::BlobsByRoot(r), Request::Status(s) => OutboundRequest::Status(s), } } @@ -73,6 +76,8 @@ pub enum Response { BlobsByRange(Option>>), /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Option>>), + /// A response to a get BLOBS_BY_ROOT request. + BlobsByRoot(Option>>), } impl std::convert::From> for RPCCodedResponse { @@ -86,6 +91,10 @@ impl std::convert::From> for RPCCodedResponse RPCCodedResponse::Success(RPCResponse::BlocksByRange(b)), None => RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange), }, + Response::BlobsByRoot(r) => match r { + Some(b) => RPCCodedResponse::Success(RPCResponse::BlobsByRoot(b)), + None => RPCCodedResponse::StreamTermination(ResponseTermination::BlobsByRoot), + }, Response::BlobsByRange(r) => match r { Some(b) => RPCCodedResponse::Success(RPCResponse::BlobsByRange(b)), None => RPCCodedResponse::StreamTermination(ResponseTermination::BlobsByRange), diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 5e770db2e9..b06134a8f9 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -989,6 +989,9 @@ impl Network { Request::BlobsByRange { .. } => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blobs_by_range"]) } + Request::BlobsByRoot { .. } => { + metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blobs_by_root"]) + } } NetworkEvent::RequestReceived { peer_id, @@ -1260,6 +1263,14 @@ impl Network { ); Some(event) } + InboundRequest::BlobsByRoot(req) => { + let event = self.build_request( + peer_request_id, + peer_id, + Request::BlobsByRoot(req), + ); + Some(event) + } } } Ok(RPCReceived::Response(id, resp)) => { @@ -1290,6 +1301,9 @@ impl Network { RPCResponse::BlocksByRoot(resp) => { self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } + RPCResponse::BlobsByRoot(resp) => { + self.build_response(id, peer_id, Response::BlobsByRoot(Some(resp))) + } } } Ok(RPCReceived::EndOfStream(id, termination)) => { @@ -1297,6 +1311,7 @@ impl Network { ResponseTermination::BlocksByRange => Response::BlocksByRange(None), ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), ResponseTermination::BlobsByRange => Response::BlobsByRange(None), + ResponseTermination::BlobsByRoot => Response::BlobsByRoot(None), }; self.build_response(id, peer_id, response) } diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index 404311ac16..ad02e07fb7 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -13,7 +13,7 @@ pub type EnrSyncCommitteeBitfield = BitVector<::SyncCommitteeSu pub type Enr = discv5::enr::Enr; pub use globals::NetworkGlobals; -pub use pubsub::{PubsubMessage, SignedBeaconBlockAndBlobsSidecar, SnappyTransform}; +pub use pubsub::{PubsubMessage, SnappyTransform}; pub use subnet::{Subnet, SubnetDiscovery}; pub use sync_state::{BackFillState, SyncState}; pub use topics::{subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS}; diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 172194816d..9663c06b67 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -11,12 +11,7 @@ use std::boxed::Box; use std::io::{Error, ErrorKind}; use std::sync::Arc; use tree_hash_derive::TreeHash; -use types::{ - Attestation, AttesterSlashing, BlobsSidecar, EthSpec, ForkContext, ForkName, ProposerSlashing, - SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, - SignedBeaconBlockCapella, SignedBeaconBlockEip4844, SignedBeaconBlockMerge, - SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, -}; +use types::{Attestation, AttesterSlashing, BlobsSidecar, EthSpec, ForkContext, ForkName, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockEip4844, SignedBeaconBlockMerge, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId}; #[derive(Debug, Clone, PartialEq)] pub enum PubsubMessage { @@ -286,7 +281,7 @@ impl std::fmt::Display for PubsubMessage { PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blob) => write!( f, "Beacon block and Blobs Sidecar: slot: {}, blobs: {}", - block_and_blob.beacon_block.message.slot, + block_and_blob.beacon_block.message().slot(), block_and_blob.blobs_sidecar.blobs.len(), ), PubsubMessage::AggregateAndProofAttestation(att) => write!( diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index dd28b15c0c..cf3bba65b1 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -45,8 +45,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock}; use derivative::Derivative; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; -use lighthouse_network::rpc::methods::BlobsByRangeRequest; -use lighthouse_network::SignedBeaconBlockAndBlobsSidecar; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, @@ -62,11 +61,7 @@ use std::time::Duration; use std::{cmp, collections::HashSet}; use task_executor::TaskExecutor; use tokio::sync::mpsc; -use types::{ - Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, - SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, - SyncCommitteeMessage, SyncSubnetId, -}; +use types::{Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId}; use work_reprocessing_queue::{ spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, }; @@ -204,6 +199,7 @@ pub const STATUS_PROCESSING: &str = "status_processing"; pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request"; pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request"; +pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; @@ -601,6 +597,21 @@ impl WorkEvent { } } + pub fn blobs_by_root_request( + peer_id: PeerId, + request_id: PeerRequestId, + request: BlobsByRootRequest, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::BlobsByRootsRequest { + peer_id, + request_id, + request, + }, + } + } + /// Get a `str` representation of the type of work this `WorkEvent` contains. pub fn work_type(&self) -> &'static str { self.work.str_id() @@ -789,6 +800,11 @@ pub enum Work { request_id: PeerRequestId, request: BlobsByRangeRequest, }, + BlobsByRootsRequest { + peer_id: PeerId, + request_id: PeerRequestId, + request: BlobsByRootRequest, + }, } impl Work { @@ -813,6 +829,7 @@ impl Work { Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST, Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST, Work::BlobsByRangeRequest { .. } => BLOBS_BY_RANGE_REQUEST, + Work::BlobsByRootsRequest { .. } => BLOBS_BY_ROOTS_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 37cc1903d3..d2391c2cfb 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -12,7 +12,6 @@ use beacon_chain::{ }; use lighthouse_network::{ Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource, - SignedBeaconBlockAndBlobsSidecar, }; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; @@ -21,11 +20,8 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; -use types::{ - Attestation, AttesterSlashing, BlobsSidecar, EthSpec, Hash256, IndexedAttestation, - ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, - SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, -}; +use types::{Attestation, AttesterSlashing, BlobsSidecar, EthSpec, Hash256, IndexedAttestation, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId}; +use types::signed_block_and_blobs::BlockMaybeBlobs; use super::{ super::work_reprocessing_queue::{ @@ -659,7 +655,7 @@ impl Worker { message_id: MessageId, peer_id: PeerId, peer_client: Client, - block: Arc>, + block: BlockMaybeBlobs, reprocess_tx: mpsc::Sender>, duplicate_cache: DuplicateCache, seen_duration: Duration, @@ -697,19 +693,6 @@ impl Worker { } } - #[allow(clippy::too_many_arguments)] - pub async fn process_gossip_block_and_blobs_sidecar( - self, - message_id: MessageId, - peer_id: PeerId, - peer_client: Client, - block_and_blob: Arc>, - seen_timestamp: Duration, - ) { - //FIXME - unimplemented!() - } - /// Process the beacon block received from the gossip network and /// if it passes gossip propagation criteria, tell the network thread to forward it. /// @@ -719,7 +702,7 @@ impl Worker { message_id: MessageId, peer_id: PeerId, peer_client: Client, - block: Arc>, + block: BlockMaybeBlobs, reprocess_tx: mpsc::Sender>, seen_duration: Duration, ) -> Option> { diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index cb90813b26..d522fa90d5 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -171,6 +171,9 @@ impl Router { Request::BlobsByRange(request) => self .processor .on_blobs_by_range_request(peer_id, id, request), + Request::BlobsByRoot(request) => self + .processor + .on_blobs_by_root_request(peer_id, id, request), } } @@ -199,6 +202,10 @@ impl Router { self.processor .on_blobs_by_range_response(peer_id, request_id, beacon_blob); } + Response::BlobsByRoot(beacon_blob) => { + self.processor + .on_blobs_by_root_response(peer_id, request_id, beacon_blob); + } } } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 2452d1826c..faa4518081 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -6,8 +6,8 @@ use crate::status::status_message; use crate::sync::manager::RequestId as SyncId; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use lighthouse_network::rpc::methods::BlobsByRangeRequest; -use lighthouse_network::{rpc::*, SignedBeaconBlockAndBlobsSidecar}; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; +use lighthouse_network::{rpc::*}; use lighthouse_network::{ Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response, }; @@ -17,11 +17,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::SyncCommitteeMessage; use tokio::sync::mpsc; -use types::{ - Attestation, AttesterSlashing, BlobsSidecar, EthSpec, ProposerSlashing, - SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, - SubnetId, SyncSubnetId, -}; +use types::{Attestation, AttesterSlashing, BlobsSidecar, EthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId}; /// Processes validated messages from the network. It relays necessary data to the syncing thread /// and processes blocks from the pubsub network. @@ -172,6 +168,18 @@ impl Processor { peer_id, request_id, request, )) } + + pub fn on_blobs_by_root_request( + &mut self, + peer_id: PeerId, + request_id: PeerRequestId, + request: BlobsByRootRequest, + ) { + self.send_beacon_processor_work(BeaconWorkEvent::blobs_by_root_request( + peer_id, request_id, request, + )) + } + /// Handle a `BlocksByRange` request from the peer. pub fn on_blocks_by_range_request( &mut self, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 5c2bc65229..1b70cf7aaf 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -9,6 +9,7 @@ use slog::{debug, error, trace, warn, Logger}; use smallvec::SmallVec; use std::sync::Arc; use store::{Hash256, SignedBeaconBlock}; +use types::signed_block_and_blobs::BlockMaybeBlobs; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; @@ -30,7 +31,7 @@ mod single_block_lookup; #[cfg(test)] mod tests; -pub type RootBlockTuple = (Hash256, Arc>); +pub type RootBlockTuple = (Hash256, BlockMaybeBlobs); const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; @@ -87,6 +88,7 @@ impl BlockLookups { let mut single_block_request = SingleBlockRequest::new(hash, peer_id); + //FIXME(sean) remove unwrap? let (peer_id, request) = single_block_request.request_block().unwrap(); if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) { self.single_block_lookups diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 38ad59ebc4..cbd8ee243b 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -4,6 +4,7 @@ use lighthouse_network::PeerId; use std::sync::Arc; use store::{Hash256, SignedBeaconBlock}; use strum::IntoStaticStr; +use types::signed_block_and_blobs::BlockMaybeBlobs; use crate::sync::{ manager::{Id, SLOT_IMPORT_TOLERANCE}, @@ -24,7 +25,7 @@ pub(crate) struct ParentLookup { /// The root of the block triggering this parent request. chain_hash: Hash256, /// The blocks that have currently been downloaded. - downloaded_blocks: Vec>>, + downloaded_blocks: Vec>, /// Request of the last parent. current_parent_request: SingleBlockRequest, /// Id of the last parent request. @@ -61,7 +62,7 @@ impl ParentLookup { pub fn new( block_root: Hash256, - block: Arc>, + block: BlockMaybeBlobs, peer_id: PeerId, ) -> Self { let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id); @@ -98,7 +99,7 @@ impl ParentLookup { self.current_parent_request.check_peer_disconnected(peer_id) } - pub fn add_block(&mut self, block: Arc>) { + pub fn add_block(&mut self, block: BlockMaybeBlobs) { let next_parent = block.parent_root(); self.downloaded_blocks.push(block); self.current_parent_request.hash = next_parent; @@ -125,7 +126,7 @@ impl ParentLookup { self.current_parent_request_id = None; } - pub fn chain_blocks(&mut self) -> Vec>> { + pub fn chain_blocks(&mut self) -> Vec> { std::mem::take(&mut self.downloaded_blocks) } @@ -133,7 +134,7 @@ impl ParentLookup { /// the processing result of the block. pub fn verify_block( &mut self, - block: Option>>, + block: Option>, failed_chains: &mut lru_cache::LRUTimeCache, ) -> Result>, VerifyError> { let root_and_block = self.current_parent_request.verify_block(block)?; diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 256a2b4297..6459fe05ee 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -8,6 +8,7 @@ use rand::seq::IteratorRandom; use ssz_types::VariableList; use store::{EthSpec, Hash256, SignedBeaconBlock}; use strum::IntoStaticStr; +use types::signed_block_and_blobs::BlockMaybeBlobs; /// Object representing a single block lookup request. #[derive(PartialEq, Eq)] @@ -105,7 +106,7 @@ impl SingleBlockRequest { /// Returns the block for processing if the response is what we expected. pub fn verify_block( &mut self, - block: Option>>, + block: Option>, ) -> Result>, VerifyError> { match self.state { State::AwaitingDownload => { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index ecaf573206..789aa7f624 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -45,7 +45,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; use futures::StreamExt; use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS; use lighthouse_network::types::{NetworkGlobals, SyncState}; -use lighthouse_network::{SignedBeaconBlockAndBlobsSidecar, SyncInfo}; +use lighthouse_network::SyncInfo; use lighthouse_network::{PeerAction, PeerId}; use slog::{crit, debug, error, info, trace, Logger}; use std::boxed::Box; @@ -53,7 +53,8 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::{BlobsSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{BlobsSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot}; +use types::signed_block_and_blobs::BlockMaybeBlobs; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -80,7 +81,7 @@ pub enum RequestId { } #[derive(Debug)] -/// A message than can be sent to the sync manager thread. +/// A message that can be sent to the sync manager thread. pub enum SyncMessage { /// A useful peer has been discovered. AddPeer(PeerId, SyncInfo), @@ -89,20 +90,12 @@ pub enum SyncMessage { RpcBlock { request_id: RequestId, peer_id: PeerId, - beacon_block: Option>>, - seen_timestamp: Duration, - }, - - /// A blob has been received from RPC. - RpcBlob { - peer_id: PeerId, - request_id: RequestId, - blob_sidecar: Option>>, + beacon_block: Option>, seen_timestamp: Duration, }, /// A block with an unknown parent has been received. - UnknownBlock(PeerId, Arc>, Hash256), + UnknownBlock(PeerId, BlockMaybeBlobs, Hash256), /// A peer has sent an object that references a block that is unknown. This triggers the /// manager to attempt to find the block matching the unknown hash. @@ -592,14 +585,6 @@ impl SyncManager { .block_lookups .parent_chain_processed(chain_hash, result, &mut self.network), }, - SyncMessage::RpcBlob { - peer_id, - request_id, - blob_sidecar, - seen_timestamp, - } => { - self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp); - }, } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 45ade7034c..1bb378431c 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -181,6 +181,9 @@ impl SyncNetworkContext { peer_id: PeerId, request: BlocksByRootRequest, ) -> Result { + + //FIXME(sean) add prune depth logic here? + trace!( self.log, "Sending BlocksByRoot Request"; diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index 35b7bfdda0..1dd1ca1cff 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -1,7 +1,9 @@ +use std::sync::Arc; use serde_derive::{Deserialize, Serialize}; +use ssz::{Decode, DecodeError}; use ssz_derive::{Decode, Encode}; use tree_hash_derive::TreeHash; -use crate::{BlobsSidecar, EthSpec, SignedBeaconBlock}; +use crate::{BlobsSidecar, EthSpec, SignedBeaconBlock, SignedBeaconBlockEip4844}; #[derive(Debug, Clone, Serialize, Deserialize, Encode, TreeHash, PartialEq)] #[serde(bound = "T: EthSpec")] @@ -10,9 +12,26 @@ pub struct SignedBeaconBlockAndBlobsSidecar { pub blobs_sidecar: BlobsSidecar, } -impl SignedBeaconBlockAndBlobsSidecar { - /// SSZ decode with fork variant determined by slot. - pub fn from_ssz_bytes(bytes: &[u8], spec: &ChainSpec) -> Result { - SignedBeaconBlock::from_ssz_bytes(bytes, spec) +impl Decode for SignedBeaconBlockAndBlobsSidecar { + fn is_ssz_fixed_len() -> bool { + todo!() + } + + fn from_ssz_bytes(bytes: &[u8]) -> Result { + todo!() + } +} + +pub enum BlockMaybeBlobs { + Block(Arc>), + BlockAndBlobs(Arc>), +} + +impl BlockMaybeBlobs { + pub fn blobs(&self) -> Option<&BlobsSidecar>{ + match self { + Self::Block(_) => None, + Self::BlockAndBlobs(block_and_blobs) => Some(&block_and_blobs.blobs_sidecar) + } } } \ No newline at end of file