From dc871566418bf01ad2dda35cf7e3e354857ad430 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Sat, 19 Nov 2022 16:53:34 -0500 Subject: [PATCH] block and blob handling progress --- beacon_node/beacon_chain/src/beacon_chain.rs | 54 +++++++-- .../beacon_chain/src/block_verification.rs | 34 +++--- .../beacon_chain/src/early_attester_cache.rs | 13 +++ .../tests/attestation_production.rs | 2 + .../lighthouse_network/src/types/pubsub.rs | 2 +- .../network/src/beacon_processor/mod.rs | 22 +++- .../beacon_processor/worker/gossip_methods.rs | 5 +- .../beacon_processor/worker/rpc_methods.rs | 106 +++++++++++++++++- beacon_node/network/src/sync/manager.rs | 10 -- .../state_processing/src/consensus_context.rs | 10 +- consensus/types/src/signed_block_and_blobs.rs | 22 +++- 11 files changed, 237 insertions(+), 43 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7ce66d0c9d..f7534c71e0 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -915,6 +915,28 @@ impl BeaconChain { Ok(self.get_block(block_root).await?.map(Arc::new)) } + pub async fn get_block_and_blobs_checking_early_attester_cache( + &self, + block_root: &Hash256, + ) -> Result< + ( + Option>>, + Option>>, + ), + Error, + > { + if let (Some(block), Some(blobs)) = ( + self.early_attester_cache.get_block(*block_root), + self.early_attester_cache.get_blobs(*block_root), + ) { + return Ok((Some(block), Some(blobs))); + } + Ok(( + self.get_block(block_root).await?.map(Arc::new), + self.get_blobs(block_root).await?.map(Arc::new), + )) + } + /// Returns the block at the given root, if any. /// /// ## Errors @@ -923,7 +945,7 @@ impl BeaconChain { pub async fn get_block( &self, block_root: &Hash256, - ) -> Result>>, Error> { + ) -> Result>, Error> { // Load block from database, returning immediately if we have the full block w payload // stored. let blinded_block = match self.store.try_get_full_block(block_root)? { @@ -981,6 +1003,18 @@ impl BeaconChain { .map(Some) } + /// Returns the blobs at the given root, if any. + /// + /// ## Errors + /// + /// May return a database error. + pub async fn get_blobs( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + Ok(self.store.get_blobs(block_root)?) + } + pub fn get_blinded_block( &self, block_root: &Hash256, @@ -2338,7 +2372,7 @@ impl BeaconChain { let last_index = filtered_chain_segment .iter() .position(|(_root, block)| { - block.block().slot().epoch(T::EthSpec::slots_per_epoch()) > start_epoch + block.slot().epoch(T::EthSpec::slots_per_epoch()) > start_epoch }) .unwrap_or(filtered_chain_segment.len()); @@ -2405,14 +2439,15 @@ impl BeaconChain { pub async fn verify_block_for_gossip( self: &Arc, block: Arc>, + blobs: Option>>, ) -> Result, BlockError> { let chain = self.clone(); self.task_executor .clone() .spawn_blocking_handle( move || { - let slot = block.block().slot(); - let graffiti_string = block.block().message().body().graffiti().as_utf8_lossy(); + let slot = block.slot(); + let graffiti_string = block.message().body().graffiti().as_utf8_lossy(); match GossipVerifiedBlock::new(block, &chain) { Ok(verified) => { @@ -2490,7 +2525,7 @@ impl BeaconChain { self.log, "Beacon block imported"; "block_root" => ?block_root, - "block_slot" => slot, + "block_slot" => %block.slot(), ); // Increment the Prometheus counter for block processing successes. @@ -2540,6 +2575,7 @@ impl BeaconChain { ) -> Result> { let ExecutionPendingBlock { block, + blobs, block_root, state, parent_block, @@ -2592,6 +2628,7 @@ impl BeaconChain { move || { chain.import_block( block, + blobs, block_root, state, confirmed_state_roots, @@ -2616,7 +2653,8 @@ impl BeaconChain { #[allow(clippy::too_many_arguments)] fn import_block( &self, - block: Arc>, + signed_block: Arc>, + blobs: Option>>, block_root: Hash256, mut state: BeaconState, confirmed_state_roots: Vec, @@ -2625,7 +2663,6 @@ impl BeaconChain { parent_block: SignedBlindedBeaconBlock, parent_eth1_finalization_data: Eth1FinalizationData, ) -> Result> { - let signed_block = block.block(); let current_slot = self.slot()?; let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); @@ -2867,6 +2904,7 @@ impl BeaconChain { if let Err(e) = self.early_attester_cache.add_head_block( block_root, signed_block.clone(), + blobs.clone(), proto_block, &state, &self.spec, @@ -2959,7 +2997,7 @@ impl BeaconChain { ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); - if let Some(blobs) = block.blobs() { + if let Some(blobs) = blobs { ops.push(StoreOp::PutBlobs(block_root, blobs)); }; let txn_lock = self.store.hot_db.begin_rw_transaction(); diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index d5aaaeb1b4..0a524b50eb 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -625,6 +625,7 @@ type PayloadVerificationHandle = /// `BeaconChain` immediately after it is instantiated. pub struct ExecutionPendingBlock { pub block: Arc>, + pub blobs: Option>>, pub block_root: Hash256, pub state: BeaconState, pub parent_block: SignedBeaconBlock>, @@ -670,6 +671,7 @@ impl GossipVerifiedBlock { /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn new( block: Arc>, + blobs: Option>>, chain: &BeaconChain, ) -> Result> { // If the block is valid for gossip we don't supply it to the slasher here because @@ -677,7 +679,7 @@ impl GossipVerifiedBlock { // it to the slasher if an error occurs, because that's the end of this block's journey, // and it could be a repeat proposal (a likely cause for slashing!). let header = block.signed_block_header(); - Self::new_without_slasher_checks(block, chain).map_err(|e| { + Self::new_without_slasher_checks(block, blobs, chain).map_err(|e| { process_block_slash_info(chain, BlockSlashInfo::from_early_error(header, e)) }) } @@ -685,9 +687,9 @@ impl GossipVerifiedBlock { /// As for new, but doesn't pass the block to the slasher. fn new_without_slasher_checks( block: Arc>, + blobs: Option>>, chain: &BeaconChain, ) -> Result> { - let block = block.block(); // Ensure the block is the correct structure for the fork at `block.slot()`. block .fork_name(&chain.spec) @@ -881,18 +883,20 @@ impl GossipVerifiedBlock { // Validate the block's execution_payload (if any). validate_execution_payload_for_gossip(&parent_block, block.message(), chain)?; - //FIXME(sean) - // if let Some(blobs_sidecar) = block.blobs() { - // validate_blob_for_gossip(blobs_sidecar, chain)?; - // } + if let Some(blobs_sidecar) = blobs.as_ref() { + validate_blob_for_gossip(blobs_sidecar, chain)?; + //FIXME(sean) validate blobs sidecar + } // Having checked the proposer index and the block root we can cache them. let consensus_context = ConsensusContext::new(block.slot()) .set_current_block_root(block_root) - .set_proposer_index(block.message().proposer_index()); + .set_proposer_index(block.message().proposer_index()) + //FIXME(sean) set blobs sidecar validation results + .set_blobs_sidecar(blobs); Ok(Self { - block: block, + block, block_root, parent, consensus_context, @@ -1054,6 +1058,7 @@ impl IntoExecutionPendingBlock for SignatureVerifiedBloc ExecutionPendingBlock::from_signature_verified_components( block, + self.consensus_context.blobs(), block_root, parent, self.consensus_context, @@ -1084,10 +1089,7 @@ impl IntoExecutionPendingBlock for Arc &SignedBeaconBlock { - match self { - Self::Block(block) => block, - Self::BlockAndBlobs(block) => &block.beacon_block, - } + self } } @@ -1101,13 +1103,12 @@ impl ExecutionPendingBlock { /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn from_signature_verified_components( block: Arc>, + blobs: Option>>, block_root: Hash256, parent: PreProcessingSnapshot, mut consensus_context: ConsensusContext, chain: &Arc>, ) -> Result> { - let block = block.block(); - if let Some(parent) = chain .canonical_head .fork_choice_read_lock() @@ -1437,8 +1438,11 @@ impl ExecutionPendingBlock { }); } + //FIXME(sean) validate blobs sidecar + Ok(Self { block, + blobs, block_root, state, parent_block: parent.beacon_block, @@ -1752,7 +1756,7 @@ fn load_parent( pre_state: parent_state, beacon_state_root: Some(parent_state_root), }, - beacon_block, + block, )) }; diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index 1ddbe13241..8d16a65e86 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -20,6 +20,7 @@ pub struct CacheItem { * Values used to make the block available. */ block: Arc>, + blobs: Option>>, proto_block: ProtoBlock, } @@ -50,6 +51,7 @@ impl EarlyAttesterCache { &self, beacon_block_root: Hash256, block: Arc>, + blobs: Option>>, proto_block: ProtoBlock, state: &BeaconState, spec: &ChainSpec, @@ -74,6 +76,7 @@ impl EarlyAttesterCache { source, target, block, + blobs, proto_block, }; @@ -155,6 +158,16 @@ impl EarlyAttesterCache { .map(|item| item.block.clone()) } + /// Returns the blobs, if `block_root` matches the cached item. + pub fn get_blobs(&self, block_root: Hash256) -> Option>> { + self.item + .read() + .as_ref() + .filter(|item| item.beacon_block_root == block_root) + .map(|item| item.blobs.clone()) + .flatten() + } + /// Returns the proto-array block, if `block_root` matches the cached item. pub fn get_proto_block(&self, block_root: Hash256) -> Option { self.item diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 85e4f1f093..5cab585b11 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -142,6 +142,7 @@ async fn produces_attestations() { .add_head_block( block_root, Arc::new(block.clone()), + None, proto_block, &state, &chain.spec, @@ -198,6 +199,7 @@ async fn early_attester_cache_old_request() { .add_head_block( head.beacon_block_root, head.beacon_block.clone(), + None, head_proto_block, &head.beacon_state, &harness.chain.spec, diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index fdc4696e91..0af16ccf06 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -287,7 +287,7 @@ impl std::fmt::Display for PubsubMessage { PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blob) => write!( f, "Beacon block and Blobs Sidecar: slot: {}, blobs: {}", - block_and_blob.beacon_block.message.slot, + block_and_blob.beacon_block.message().slot(), block_and_blob.blobs_sidecar.blobs.len(), ), PubsubMessage::AggregateAndProofAttestation(att) => write!( diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 1dd4629dd4..3da921ab8a 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -1541,6 +1541,7 @@ impl BeaconProcessor { peer_id, peer_client, block, + None, work_reprocessing_tx, duplicate_cache, seen_timestamp, @@ -1558,11 +1559,14 @@ impl BeaconProcessor { seen_timestamp, } => task_spawner.spawn_async(async move { worker - .process_gossip_block_and_blobs_sidecar( + .process_gossip_block( message_id, peer_id, peer_client, - block_and_blobs, + block_and_blobs.beacon_block.clone(), + Some(block_and_blobs.blobs_sidecar.clone()), + work_reprocessing_tx, + duplicate_cache, seen_timestamp, ) .await @@ -1720,6 +1724,20 @@ impl BeaconProcessor { ) }), + Work::BlobsByRootsRequest { + peer_id, + request_id, + request, + } => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| { + worker.handle_blocks_by_root_request( + sub_executor, + send_idle_on_drop, + peer_id, + request_id, + request, + ) + }), + Work::UnknownBlockAttestation { message_id, peer_id, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index b55246071f..53a3d700bd 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -658,6 +658,7 @@ impl Worker { peer_id: PeerId, peer_client: Client, block: Arc>, + blobs: Option>>, reprocess_tx: mpsc::Sender>, duplicate_cache: DuplicateCache, seen_duration: Duration, @@ -668,6 +669,7 @@ impl Worker { peer_id, peer_client, block, + blobs, reprocess_tx.clone(), seen_duration, ) @@ -705,6 +707,7 @@ impl Worker { peer_id: PeerId, peer_client: Client, block: Arc>, + blobs: Option>>, reprocess_tx: mpsc::Sender>, seen_duration: Duration, ) -> Option> { @@ -719,7 +722,7 @@ impl Worker { let verification_result = self .chain .clone() - .verify_block_for_gossip(block.clone()) + .verify_block_for_gossip(block.clone(), blobs) .await; let block_root = if let Ok(verified_block) = &verification_result { diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index beaea38336..3c2344b49e 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -4,7 +4,9 @@ use crate::status::ToStatusMessage; use crate::sync::SyncMessage; use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, WhenSlotSkipped}; use itertools::process_results; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MAX_REQUEST_BLOBS_SIDECARS}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, BlobsByRootRequest, MAX_REQUEST_BLOBS_SIDECARS, +}; use lighthouse_network::rpc::StatusMessage; use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; @@ -12,7 +14,7 @@ use slog::{debug, error}; use slot_clock::SlotClock; use std::sync::Arc; use task_executor::TaskExecutor; -use types::{Epoch, EthSpec, Hash256, Slot}; +use types::{Epoch, EthSpec, Hash256, SignedBeaconBlockAndBlobsSidecar, Slot}; use super::Worker; @@ -204,6 +206,106 @@ impl Worker { "load_blocks_by_root_blocks", ) } + /// Handle a `BlobsByRoot` request from the peer. + pub fn handle_blobs_by_root_request( + self, + executor: TaskExecutor, + send_on_drop: SendOnDrop, + peer_id: PeerId, + request_id: PeerRequestId, + request: BlobsByRootRequest, + ) { + // Fetching blocks is async because it may have to hit the execution layer for payloads. + executor.spawn( + async move { + let mut send_block_count = 0; + let mut send_response = true; + for root in request.block_roots.iter() { + match self + .chain + .get_block_and_blobs_checking_early_attester_cache(root) + .await + { + Ok((Some(block), Some(blobs))) => { + self.send_response( + peer_id, + Response::BlobsByRoot(Some(Arc::new(SignedBeaconBlockAndBlobsSidecar { + beacon_block: block, + blobs_sidecar: blobs, + }))), + request_id, + ); + send_block_count += 1; + } + Ok((None, None)) => { + debug!( + self.log, + "Peer requested unknown block and blobs"; + "peer" => %peer_id, + "request_root" => ?root + ); + } + Ok((Some(_), None)) => { + debug!( + self.log, + "Peer requested block and blob, but no blob found"; + "peer" => %peer_id, + "request_root" => ?root + ); + } + Ok((None, Some(_))) => { + debug!( + self.log, + "Peer requested block and blob, but no block found"; + "peer" => %peer_id, + "request_root" => ?root + ); + } + Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { + debug!( + self.log, + "Failed to fetch execution payload for block and blobs by root request"; + "block_root" => ?root, + "reason" => "execution layer not synced", + ); + // send the stream terminator + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Execution layer not synced".into(), + request_id, + ); + send_response = false; + break; + } + Err(e) => { + debug!( + self.log, + "Error fetching block for peer"; + "peer" => %peer_id, + "request_root" => ?root, + "error" => ?e, + ); + } + } + } + debug!( + self.log, + "Received BlobsByRoot Request"; + "peer" => %peer_id, + "requested" => request.block_roots.len(), + "returned" => %send_block_count + ); + + // send stream termination + if send_response { + self.send_response(peer_id, Response::BlocksByRoot(None), request_id); + } + drop(send_on_drop); + }, + "load_blobs_by_root_blocks", + ) + } /// Handle a `BlocksByRange` request from the peer. pub fn handle_blocks_by_range_request( diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 230a67fcfe..2459af429a 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -68,16 +68,6 @@ pub const SLOT_IMPORT_TOLERANCE: usize = 32; pub type Id = u32; -pub struct SeansBlock {} - -pub struct SeansBlob {} - -/// This is the one that has them both and goes to range. -pub struct SeansBlockBlob { - block: SeansBlock, - blob: SeansBlob, -} - /// Id of rpc requests sent by sync to the network. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum RequestId { diff --git a/consensus/state_processing/src/consensus_context.rs b/consensus/state_processing/src/consensus_context.rs index 4ae1e7e23d..75374a3e41 100644 --- a/consensus/state_processing/src/consensus_context.rs +++ b/consensus/state_processing/src/consensus_context.rs @@ -1,4 +1,5 @@ use std::marker::PhantomData; +use std::sync::Arc; use tree_hash::TreeHash; use types::{ AbstractExecPayload, BeaconState, BeaconStateError, BlobsSidecar, ChainSpec, EthSpec, @@ -14,12 +15,11 @@ pub struct ConsensusContext { /// Block root of the block at `slot`. current_block_root: Option, /// Should only be populated if the sidecar has not been validated. - blobs_sidecar: Option>>, + blobs_sidecar: Option>>, /// Whether `validate_blobs_sidecar` has successfully passed. blobs_sidecar_validated: bool, /// Whether `verify_kzg_commitments_against_transactions` has successfully passed. blobs_verified_vs_txs: bool, - _phantom: PhantomData, } #[derive(Debug, PartialEq, Clone)] @@ -43,7 +43,6 @@ impl ConsensusContext { blobs_sidecar: None, blobs_sidecar_validated: false, blobs_verified_vs_txs: false, - _phantom: PhantomData, } } @@ -116,4 +115,9 @@ impl ConsensusContext { pub fn blobs_verified_vs_txs(&self) -> bool { self.blobs_verified_vs_txs } + + pub fn set_blobs_sidecar(mut self, blobs_sidecar: Option>>) -> Self { + self.blobs_sidecar = blobs_sidecar; + self + } } diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index 5e58a08905..8dfd8d3883 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -7,7 +7,27 @@ use tree_hash_derive::TreeHash; #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq)] #[serde(bound = "T: EthSpec")] -pub struct SignedBeaconBlockAndBlobsSidecar { +pub struct SignedBeaconBlockAndBlobsSidecarDecode { pub beacon_block: SignedBeaconBlockEip4844, pub blobs_sidecar: BlobsSidecar, } + +#[derive(Debug, Clone, Serialize, Deserialize, Encode, TreeHash, PartialEq)] +#[serde(bound = "T: EthSpec")] +pub struct SignedBeaconBlockAndBlobsSidecar { + pub beacon_block: Arc>, + pub blobs_sidecar: Arc>, +} + +impl SignedBeaconBlockAndBlobsSidecar { + pub fn from_ssz_bytes(bytes: &[u8]) -> Result { + let SignedBeaconBlockAndBlobsSidecarDecode { + beacon_block, + blobs_sidecar, + } = SignedBeaconBlockAndBlobsSidecarDecode::from_ssz_bytes(bytes)?; + Ok(SignedBeaconBlockAndBlobsSidecar { + beacon_block: Arc::new(SignedBeaconBlock::Eip4844(beacon_block)), + blobs_sidecar: Arc::new(blobs_sidecar), + }) + } +}