From e7ee79185b2b1ae0aef415d7d1b9459c8a470d94 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 21 Nov 2022 14:09:06 -0500 Subject: [PATCH] add blobs cache and fix some block production --- beacon_node/beacon_chain/src/beacon_chain.rs | 51 ++-- beacon_node/beacon_chain/src/blob_cache.rs | 32 +++ .../beacon_chain/src/blob_verification.rs | 3 +- .../beacon_chain/src/block_verification.rs | 45 +++- beacon_node/beacon_chain/src/builder.rs | 2 + beacon_node/beacon_chain/src/lib.rs | 1 + beacon_node/execution_layer/src/lib.rs | 12 +- beacon_node/http_api/src/lib.rs | 2 - beacon_node/http_api/src/publish_blocks.rs | 37 ++- .../src/rpc/codec/ssz_snappy.rs | 2 +- .../lighthouse_network/src/rpc/methods.rs | 8 +- .../src/service/api_types.rs | 2 +- .../network/src/beacon_processor/mod.rs | 2 +- .../beacon_processor/worker/rpc_methods.rs | 249 +++++++++--------- beacon_node/network/src/router/processor.rs | 34 ++- beacon_node/network/src/sync/manager.rs | 18 +- .../state_processing/src/consensus_context.rs | 4 + 17 files changed, 315 insertions(+), 189 deletions(-) create mode 100644 beacon_node/beacon_chain/src/blob_cache.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f7534c71e0..49a96cbb48 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6,6 +6,7 @@ use crate::attestation_verification::{ use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; +use crate::blob_cache::BlobCache; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::{ check_block_is_finalized_descendant, check_block_relevancy, get_block_root, @@ -389,6 +390,7 @@ pub struct BeaconChain { pub slasher: Option>>, /// Provides monitoring of a set of explicitly defined validators. pub validator_monitor: RwLock>, + pub blob_cache: BlobCache, } type BeaconBlockAndState = (BeaconBlock, BeaconState); @@ -2360,8 +2362,6 @@ impl BeaconChain { }; while let Some((_root, block)) = filtered_chain_segment.first() { - let block: &SignedBeaconBlock = block.block(); - // Determine the epoch of the first block in the remaining segment. let start_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); @@ -2449,7 +2449,7 @@ impl BeaconChain { let slot = block.slot(); let graffiti_string = block.message().body().graffiti().as_utf8_lossy(); - match GossipVerifiedBlock::new(block, &chain) { + match GossipVerifiedBlock::new(block, blobs, &chain) { Ok(verified) => { debug!( chain.log, @@ -2505,6 +2505,8 @@ impl BeaconChain { // Increment the Prometheus counter for block processing requests. metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS); + let slot = unverified_block.block().slot(); + // A small closure to group the verification and import errors. let chain = self.clone(); let import_block = async move { @@ -2515,8 +2517,6 @@ impl BeaconChain { .await }; - let slot = unverified_block.block().slot(); - // Verify and import the block. match import_block.await { // The block was successfully verified and imported. Yay. @@ -2525,7 +2525,7 @@ impl BeaconChain { self.log, "Beacon block imported"; "block_root" => ?block_root, - "block_slot" => %block.slot(), + "block_slot" => slot, ); // Increment the Prometheus counter for block processing successes. @@ -3693,6 +3693,8 @@ impl BeaconChain { prepare_payload_handle: _, } = partial_beacon_block; + let (payload, kzg_commitments_opt, blobs) = block_contents.deconstruct(); + let inner_block = match &state { BeaconState::Base(_) => BeaconBlock::Base(BeaconBlockBase { slot, @@ -3746,8 +3748,7 @@ impl BeaconChain { voluntary_exits: voluntary_exits.into(), sync_aggregate: sync_aggregate .ok_or(BlockProductionError::MissingSyncAggregate)?, - execution_payload: block_contents - .to_payload() + execution_payload: payload .try_into() .map_err(|_| BlockProductionError::InvalidPayloadFork)?, }, @@ -3768,16 +3769,14 @@ impl BeaconChain { voluntary_exits: voluntary_exits.into(), sync_aggregate: sync_aggregate .ok_or(BlockProductionError::MissingSyncAggregate)?, - execution_payload: block_contents - .to_payload() + execution_payload: payload .try_into() .map_err(|_| BlockProductionError::InvalidPayloadFork)?, }, }), BeaconState::Eip4844(_) => { - let kzg_commitments = block_contents - .kzg_commitments() - .ok_or(BlockProductionError::InvalidPayloadFork)?; + let kzg_commitments = + kzg_commitments_opt.ok_or(BlockProductionError::InvalidPayloadFork)?; BeaconBlock::Eip4844(BeaconBlockEip4844 { slot, proposer_index, @@ -3794,11 +3793,10 @@ impl BeaconChain { voluntary_exits: voluntary_exits.into(), sync_aggregate: sync_aggregate .ok_or(BlockProductionError::MissingSyncAggregate)?, - execution_payload: block_contents - .to_payload() + execution_payload: payload .try_into() .map_err(|_| BlockProductionError::InvalidPayloadFork)?, - blob_kzg_commitments: VariableList::from(kzg_commitments.to_vec()), + blob_kzg_commitments: VariableList::from(kzg_commitments), }, }) } @@ -3828,8 +3826,13 @@ impl BeaconChain { ProduceBlockVerification::VerifyRandao => BlockSignatureStrategy::VerifyRandao, ProduceBlockVerification::NoVerification => BlockSignatureStrategy::NoVerification, }; + // Use a context without block root or proposer index so that both are checked. - let mut ctxt = ConsensusContext::new(block.slot()); + let mut ctxt = ConsensusContext::new(block.slot()) + //FIXME(sean) This is a hack beacuse `valdiate blobs sidecar requires the block root` + // which we won't have until after the state root is calculated. + .set_blobs_sidecar_validated(true); + per_block_processing( &mut state, &block, @@ -3847,6 +3850,20 @@ impl BeaconChain { let (mut block, _) = block.deconstruct(); *block.state_root_mut() = state_root; + //FIXME(sean) + // - generate kzg proof + // - validate blobs then cache them + if let Some(blobs) = blobs { + let beacon_block_root = block.canonical_root(); + let blobs_sidecar = BlobsSidecar { + beacon_block_slot: slot, + beacon_block_root, + blobs: VariableList::from(blobs), + kzg_aggregate_proof: KzgProof::default(), + }; + self.blob_cache.put(beacon_block_root, blobs_sidecar); + } + metrics::inc_counter(&metrics::BLOCK_PRODUCTION_SUCCESSES); trace!( diff --git a/beacon_node/beacon_chain/src/blob_cache.rs b/beacon_node/beacon_chain/src/blob_cache.rs new file mode 100644 index 0000000000..7f057ad9ed --- /dev/null +++ b/beacon_node/beacon_chain/src/blob_cache.rs @@ -0,0 +1,32 @@ +use lru::LruCache; +use parking_lot::Mutex; +use tree_hash::TreeHash; +use types::{BlobsSidecar, EthSpec, ExecutionPayload, Hash256}; + +pub const DEFAULT_BLOB_CACHE_SIZE: usize = 10; + +/// A cache blobs by beacon block root. +pub struct BlobCache { + blobs: Mutex>>, +} + +#[derive(Hash, PartialEq, Eq)] +struct BlobCacheId(Hash256); + +impl Default for BlobCache { + fn default() -> Self { + BlobCache { + blobs: Mutex::new(LruCache::new(DEFAULT_BLOB_CACHE_SIZE)), + } + } +} + +impl BlobCache { + pub fn put(&self, beacon_block: Hash256, blobs: BlobsSidecar) -> Option> { + self.blobs.lock().put(BlobCacheId(beacon_block), blobs) + } + + pub fn pop(&self, root: &Hash256) -> Option> { + self.blobs.lock().pop(&BlobCacheId(*root)) + } +} diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index d43eb7ea77..ff313e19a4 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -7,6 +7,7 @@ use crate::BeaconChainError; use bls::PublicKey; use types::{consts::eip4844::BLS_MODULUS, BeaconStateError, BlobsSidecar, Hash256, Slot}; +#[derive(Debug)] pub enum BlobError { /// The blob sidecar is from a slot that is later than the current slot (with respect to the /// gossip clock disparity). @@ -82,7 +83,7 @@ impl From for BlobError { pub fn validate_blob_for_gossip( blob_sidecar: &BlobsSidecar, - chain: &Arc>, + chain: &BeaconChain, ) -> Result<(), BlobError> { let blob_slot = blob_sidecar.beacon_block_slot; // Do not gossip or process blobs from future or past slots. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 0a524b50eb..c311d5b540 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -42,7 +42,7 @@ //! END //! //! ``` -use crate::blob_verification::validate_blob_for_gossip; +use crate::blob_verification::{validate_blob_for_gossip, BlobError}; use crate::eth1_finalization_cache::Eth1FinalizationData; use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, @@ -51,6 +51,7 @@ use crate::execution_payload::{ use crate::snapshot_cache::PreProcessingSnapshot; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; +use crate::BlockError::BlobValidation; use crate::{ beacon_chain::{ BeaconForkChoice, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, MAXIMUM_GOSSIP_CLOCK_DISPARITY, @@ -138,7 +139,10 @@ pub enum BlockError { /// its parent. ParentUnknown(Arc>), /// The block skips too many slots and is a DoS risk. - TooManySkippedSlots { parent_slot: Slot, block_slot: Slot }, + TooManySkippedSlots { + parent_slot: Slot, + block_slot: Slot, + }, /// The block slot is greater than the present slot. /// /// ## Peer scoring @@ -153,7 +157,10 @@ pub enum BlockError { /// ## Peer scoring /// /// The peer has incompatible state transition logic and is faulty. - StateRootMismatch { block: Hash256, local: Hash256 }, + StateRootMismatch { + block: Hash256, + local: Hash256, + }, /// The block was a genesis block, these blocks cannot be re-imported. GenesisBlock, /// The slot is finalized, no need to import. @@ -172,7 +179,9 @@ pub enum BlockError { /// /// It's unclear if this block is valid, but it conflicts with finality and shouldn't be /// imported. - NotFinalizedDescendant { block_parent_root: Hash256 }, + NotFinalizedDescendant { + block_parent_root: Hash256, + }, /// Block is already known, no need to re-import. /// /// ## Peer scoring @@ -185,7 +194,10 @@ pub enum BlockError { /// /// The `proposer` has already proposed a block at this slot. The existing block may or may not /// be equal to the given block. - RepeatProposal { proposer: u64, slot: Slot }, + RepeatProposal { + proposer: u64, + slot: Slot, + }, /// The block slot exceeds the MAXIMUM_BLOCK_SLOT_NUMBER. /// /// ## Peer scoring @@ -200,7 +212,10 @@ pub enum BlockError { /// ## Peer scoring /// /// The block is invalid and the peer is faulty. - IncorrectBlockProposer { block: u64, local_shuffling: u64 }, + IncorrectBlockProposer { + block: u64, + local_shuffling: u64, + }, /// The proposal signature in invalid. /// /// ## Peer scoring @@ -224,7 +239,10 @@ pub enum BlockError { /// ## Peer scoring /// /// The block is invalid and the peer is faulty. - BlockIsNotLaterThanParent { block_slot: Slot, parent_slot: Slot }, + BlockIsNotLaterThanParent { + block_slot: Slot, + parent_slot: Slot, + }, /// At least one block in the chain segment did not have it's parent root set to the root of /// the prior block. /// @@ -280,7 +298,10 @@ pub enum BlockError { /// /// The peer sent us an invalid block, but I'm not really sure how to score this in an /// "optimistic" sync world. - ParentExecutionPayloadInvalid { parent_root: Hash256 }, + ParentExecutionPayloadInvalid { + parent_root: Hash256, + }, + BlobValidation(BlobError), } /// Returned when block validation failed due to some issue verifying @@ -625,7 +646,7 @@ type PayloadVerificationHandle = /// `BeaconChain` immediately after it is instantiated. pub struct ExecutionPendingBlock { pub block: Arc>, - pub blobs: Option>>, + pub blobs: Option>>, pub block_root: Hash256, pub state: BeaconState, pub parent_block: SignedBeaconBlock>, @@ -884,7 +905,7 @@ impl GossipVerifiedBlock { validate_execution_payload_for_gossip(&parent_block, block.message(), chain)?; if let Some(blobs_sidecar) = blobs.as_ref() { - validate_blob_for_gossip(blobs_sidecar, chain)?; + validate_blob_for_gossip(blobs_sidecar, chain).map_err(BlobValidation)?; //FIXME(sean) validate blobs sidecar } @@ -1058,7 +1079,7 @@ impl IntoExecutionPendingBlock for SignatureVerifiedBloc ExecutionPendingBlock::from_signature_verified_components( block, - self.consensus_context.blobs(), + self.consensus_context.blobs_sidecar(), block_root, parent, self.consensus_context, @@ -1103,7 +1124,7 @@ impl ExecutionPendingBlock { /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn from_signature_verified_components( block: Arc>, - blobs: Option>>, + blobs: Option>>, block_root: Hash256, parent: PreProcessingSnapshot, mut consensus_context: ConsensusContext, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 58bbb2b5c6..9c85961637 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,4 +1,5 @@ use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY}; +use crate::blob_cache::BlobCache; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; use crate::eth1_finalization_cache::Eth1FinalizationCache; use crate::fork_choice_signal::ForkChoiceSignalTx; @@ -810,6 +811,7 @@ where graffiti: self.graffiti, slasher: self.slasher.clone(), validator_monitor: RwLock::new(validator_monitor), + blob_cache: BlobCache::default(), }; let head = beacon_chain.head_snapshot(); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 615559608f..19b90de1a5 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -5,6 +5,7 @@ mod beacon_chain; mod beacon_fork_choice_store; pub mod beacon_proposer_cache; mod beacon_snapshot; +pub mod blob_cache; pub mod blob_verification; pub mod block_reward; mod block_times_cache; diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 04bdb4a20d..05d45c07a7 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -4,7 +4,6 @@ //! This crate only provides useful functionality for "The Merge", it does not provide any of the //! deposit-contract functionality that the `beacon_node/eth1` crate already provides. -use crate::json_structures::JsonBlobBundles; use crate::payload_cache::PayloadCache; use auth::{strip_prefix, Auth, JwtKey}; use builder_client::BuilderHttpClient; @@ -100,6 +99,17 @@ pub enum BlockProposalContents> { } impl> BlockProposalContents { + pub fn deconstruct(self) -> (Payload, Option>, Option>>) { + match self { + Self::Payload(payload) => (payload, None, None), + Self::PayloadAndBlobs { + payload, + kzg_commitments, + blobs, + } => (payload, Some(kzg_commitments), Some(blobs)), + } + } + pub fn payload(&self) -> &Payload { match self { Self::Payload(payload) => payload, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index a747430eee..3c0f70fb57 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1047,8 +1047,6 @@ pub fn serve( chain: Arc>, network_tx: UnboundedSender>, log: Logger| async move { - // need to have cached the blob sidecar somewhere in the beacon chain - // to publish publish_blocks::publish_block(None, block, None, chain, &network_tx, log) .await .map(|()| warp::reply()) diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 0167da8d47..0305213cc9 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -10,7 +10,8 @@ use tokio::sync::mpsc::UnboundedSender; use tree_hash::TreeHash; use types::{ AbstractExecPayload, BlindedPayload, BlobsSidecar, EthSpec, ExecPayload, ExecutionBlockHash, - FullPayload, Hash256, SignedBeaconBlock, SignedBeaconBlockEip4844, + FullPayload, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, + SignedBeaconBlockEip4844, }; use warp::Rejection; @@ -18,31 +19,30 @@ use warp::Rejection; pub async fn publish_block( block_root: Option, block: Arc>, - blobs_sidecar: Option>>, chain: Arc>, network_tx: &UnboundedSender>, log: Logger, ) -> Result<(), Rejection> { let seen_timestamp = timestamp_now(); + //FIXME(sean) have to move this to prior to publishing because it's included in the blobs sidecar message. + //this may skew metrics + let block_root = block_root.unwrap_or_else(|| block.canonical_root()); + // Send the block, regardless of whether or not it is valid. The API // specification is very clear that this is the desired behaviour. - - let message = match &*block { - SignedBeaconBlock::Eip4844(block) => { - if let Some(sidecar) = blobs_sidecar { - PubsubMessage::BeaconBlockAndBlobsSidecars(Arc::new( - SignedBeaconBlockAndBlobsSidecar { - beacon_block: block.clone(), - blobs_sidecar: (*sidecar).clone(), - }, - )) - } else { - //TODO(pawan): return an empty sidecar instead - return Err(warp_utils::reject::broadcast_without_import(format!(""))); - } + let message = if matches!(block, SignedBeaconBlock::Eip4844(_)) { + if let Some(sidecar) = chain.blob_cache.pop(&block_root) { + PubsubMessage::BeaconBlockAndBlobsSidecars(Arc::new(SignedBeaconBlockAndBlobsSidecar { + beacon_block: block, + blobs_sidecar: Arc::new(sidecar), + })) + } else { + //FIXME(sean): This should probably return a specific no - blob cached error code, beacon API coordination required + return Err(warp_utils::reject::broadcast_without_import(format!(""))); } - _ => PubsubMessage::BeaconBlock(block.clone()), + } else { + PubsubMessage::BeaconBlock(block.clone()) }; crate::publish_pubsub_message(network_tx, message)?; @@ -50,8 +50,6 @@ pub async fn publish_block( let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); metrics::observe_duration(&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, delay); - let block_root = block_root.unwrap_or_else(|| block.canonical_root()); - match chain .process_block(block_root, block.clone(), CountUnrealized::True) .await @@ -153,7 +151,6 @@ pub async fn publish_blinded_block( publish_block::( Some(block_root), Arc::new(full_block), - None, chain, network_tx, log, diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index ef06b2b714..048fa67052 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -574,7 +574,7 @@ fn handle_v1_response( })?; match fork_name { ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRange(Arc::new( - SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(decoded_buffer)?, + BlobsSidecar::from_ssz_bytes(decoded_buffer)?, )))), _ => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 2a2d12f3f2..1f9df0a0b9 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -266,7 +266,7 @@ pub enum RPCResponse { BlocksByRoot(Arc>), /// A response to a get BLOBS_BY_RANGE request - BlobsByRange(Arc>), + BlobsByRange(Arc>), /// A response to a get BLOBS_BY_ROOT request. BlobsByRoot(Arc>), @@ -427,11 +427,7 @@ impl std::fmt::Display for RPCResponse { write!(f, "BlocksByRoot: Block slot: {}", block.slot()) } RPCResponse::BlobsByRange(blob) => { - write!( - f, - "BlobsByRange: Blob slot: {}", - blob.blobs_sidecar.beacon_block_slot - ) + write!(f, "BlobsByRange: Blob slot: {}", blob.beacon_block_slot) } RPCResponse::BlobsByRoot(blob) => { write!( diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 8dae3e25e1..b9360aeacc 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -73,7 +73,7 @@ pub enum Response { /// A response to a get BLOCKS_BY_RANGE request. A None response signals the end of the batch. BlocksByRange(Option>>), /// A response to a get BLOBS_BY_RANGE request. A None response signals the end of the batch. - BlobsByRange(Option>>), + BlobsByRange(Option>>), /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Option>>), /// A response to a get BLOBS_BY_ROOT request. diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 3da921ab8a..7025711e59 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -1729,7 +1729,7 @@ impl BeaconProcessor { request_id, request, } => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| { - worker.handle_blocks_by_root_request( + worker.handle_blobs_by_root_request( sub_executor, send_idle_on_drop, peer_id, diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 3c2344b49e..4a22266ab4 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -12,6 +12,7 @@ use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error}; use slot_clock::SlotClock; +use ssz_types::VariableList; use std::sync::Arc; use task_executor::TaskExecutor; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlockAndBlobsSidecar, Slot}; @@ -498,129 +499,129 @@ impl Worker { //FIXME(sean) create the blobs iter - // let forwards_block_root_iter = match self - // .chain - // .forwards_iter_block_roots(Slot::from(req.start_slot)) - // { - // Ok(iter) => iter, - // Err(BeaconChainError::HistoricalBlockError( - // HistoricalBlockError::BlockOutOfRange { - // slot, - // oldest_block_slot, - // }, - // )) => { - // debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot); - // return self.send_error_response( - // peer_id, - // RPCResponseErrorCode::ResourceUnavailable, - // "Backfilling".into(), - // request_id, - // ); - // } - // Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), - // }; - // - // // Pick out the required blocks, ignoring skip-slots. - // let mut last_block_root = None; - // let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - // iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) - // // map skip slots to None - // .map(|(root, _)| { - // let result = if Some(root) == last_block_root { - // None - // } else { - // Some(root) - // }; - // last_block_root = Some(root); - // result - // }) - // .collect::>>() - // }); - // - // let block_roots = match maybe_block_roots { - // Ok(block_roots) => block_roots, - // Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e), - // }; - // - // // remove all skip slots - // let block_roots = block_roots.into_iter().flatten().collect::>(); - // - // // Fetching blocks is async because it may have to hit the execution layer for payloads. - // executor.spawn( - // async move { - // let mut blocks_sent = 0; - // let mut send_response = true; - // - // for root in block_roots { - // match self.chain.store.get_blobs(&root) { - // Ok(Some(blob)) => { - // blocks_sent += 1; - // self.send_network_message(NetworkMessage::SendResponse { - // peer_id, - // response: Response::BlobsByRange(Some(Arc::new(VariableList::new(vec![blob.message]).unwrap()))), - // id: request_id, - // }); - // } - // Ok(None) => { - // error!( - // self.log, - // "Blob in the chain is not in the store"; - // "request_root" => ?root - // ); - // break; - // } - // Err(e) => { - // error!( - // self.log, - // "Error fetching block for peer"; - // "block_root" => ?root, - // "error" => ?e - // ); - // break; - // } - // } - // } - // - // let current_slot = self - // .chain - // .slot() - // .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - // - // if blocks_sent < (req.count as usize) { - // debug!( - // self.log, - // "BlocksByRange Response processed"; - // "peer" => %peer_id, - // "msg" => "Failed to return all requested blocks", - // "start_slot" => req.start_slot, - // "current_slot" => current_slot, - // "requested" => req.count, - // "returned" => blocks_sent - // ); - // } else { - // debug!( - // self.log, - // "BlocksByRange Response processed"; - // "peer" => %peer_id, - // "start_slot" => req.start_slot, - // "current_slot" => current_slot, - // "requested" => req.count, - // "returned" => blocks_sent - // ); - // } - // - // if send_response { - // // send the stream terminator - // self.send_network_message(NetworkMessage::SendResponse { - // peer_id, - // response: Response::BlobsByRange(None), - // id: request_id, - // }); - // } - // - // drop(send_on_drop); - // }, - // "load_blocks_by_range_blocks", - // ); + let forwards_blob_root_iter = match self + .chain + .forwards_iter_block_roots(Slot::from(req.start_slot)) + { + Ok(iter) => iter, + Err(BeaconChainError::HistoricalBlockError( + HistoricalBlockError::BlockOutOfRange { + slot, + oldest_block_slot, + }, + )) => { + debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot); + return self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Backfilling".into(), + request_id, + ); + } + Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), + }; + + // Pick out the required blocks, ignoring skip-slots. + let mut last_block_root = None; + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .collect::>>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e), + }; + + // remove all skip slots + let block_roots = block_roots.into_iter().flatten().collect::>(); + + // Fetching blocks is async because it may have to hit the execution layer for payloads. + executor.spawn( + async move { + let mut blocks_sent = 0; + let mut send_response = true; + + for root in block_roots { + match self.chain.store.get_blobs(&root) { + Ok(Some(blob)) => { + blocks_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::BlobsByRange(Some(Arc::new(blob))), + id: request_id, + }); + } + Ok(None) => { + error!( + self.log, + "Blob in the chain is not in the store"; + "request_root" => ?root + ); + break; + } + Err(e) => { + error!( + self.log, + "Error fetching blob for peer"; + "block_root" => ?root, + "error" => ?e + ); + break; + } + } + } + + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + if blobs_sent < (req.count as usize) { + debug!( + self.log, + "BlobsByRange Response processed"; + "peer" => %peer_id, + "msg" => "Failed to return all requested blocks", + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blobs_sent + ); + } else { + debug!( + self.log, + "BlobsByRange Response processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blobs_sent + ); + } + + if send_response { + // send the stream terminator + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::BlobsByRange(None), + id: request_id, + }); + } + + drop(send_on_drop); + }, + "load_blocks_by_range_blocks", + ); } } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 432b11b889..f6cdb09826 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -232,7 +232,7 @@ impl Processor { &mut self, peer_id: PeerId, request_id: RequestId, - blob_wrapper: Option>>, + blob_sidecar: Option>>, ) { trace!( self.log, @@ -244,7 +244,7 @@ impl Processor { self.send_to_sync(SyncMessage::RpcBlob { peer_id, request_id: id, - blob_sidecar: blob_wrapper, + blob_sidecar, seen_timestamp: timestamp_now(), }); } else { @@ -285,6 +285,36 @@ impl Processor { }); } + /// Handle a `BlobsByRoot` response from the peer. + pub fn on_blobs_by_root_response( + &mut self, + peer_id: PeerId, + request_id: RequestId, + block_and_blobs: Option>>, + ) { + let request_id = match request_id { + RequestId::Sync(sync_id) => match sync_id { + id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, + SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => { + unreachable!("Batch syncing do not request BBRoot requests") + } + }, + RequestId::Router => unreachable!("All BBRoot requests belong to sync"), + }; + + trace!( + self.log, + "Received BlockAndBlobssByRoot Response"; + "peer" => %peer_id, + ); + self.send_to_sync(SyncMessage::RpcBlockAndBlob { + peer_id, + request_id, + block_and_blobs, + seen_timestamp: timestamp_now(), + }); + } + /// Process a gossip message declaring a new block. /// /// Attempts to apply to block to the beacon chain. May queue the block for later processing. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 2459af429a..3d3be5c097 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -97,6 +97,22 @@ pub enum SyncMessage { seen_timestamp: Duration, }, + /// A blob has been received from the RPC. + RpcBlob { + request_id: RequestId, + peer_id: PeerId, + blob_sidecar: Option>>, + seen_timestamp: Duration, + }, + + /// A block and blobs have been received from the RPC. + RpcBlockAndBlob { + request_id: RequestId, + peer_id: PeerId, + block_and_blobs: Option>>, + seen_timestamp: Duration, + }, + /// A block with an unknown parent has been received. UnknownBlock(PeerId, Arc>, Hash256), @@ -729,7 +745,7 @@ impl SyncManager { &mut self, request_id: RequestId, peer_id: PeerId, - beacon_block: Option, + beacon_block: Option>>, seen_timestamp: Duration, ) { let RequestId::RangeBlockBlob { id } = request_id else { diff --git a/consensus/state_processing/src/consensus_context.rs b/consensus/state_processing/src/consensus_context.rs index 75374a3e41..f66e578fe1 100644 --- a/consensus/state_processing/src/consensus_context.rs +++ b/consensus/state_processing/src/consensus_context.rs @@ -120,4 +120,8 @@ impl ConsensusContext { self.blobs_sidecar = blobs_sidecar; self } + + pub fn blobs_sidecar(&self) -> Option>> { + self.blobs_sidecar.clone() + } }