diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1afff4a958..4986f6251a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1939,8 +1939,7 @@ impl BeaconChain { self: &Arc, blob_sidecar: SignedBlobSidecar, subnet_id: u64, - ) -> Result, BlobError> // TODO(pawan): make a GossipVerifedBlob type - { + ) -> Result, BlobError> { blob_verification::validate_blob_sidecar_for_gossip(blob_sidecar, subnet_id, self) } @@ -2626,7 +2625,18 @@ impl BeaconChain { ) .await { - Ok(_) => imported_blocks += 1, + Ok(status) => { + imported_blocks += 1; + match status { + AvailabilityProcessingStatus::Imported(_) => { + // The block was imported successfully. + } + AvailabilityProcessingStatus::PendingBlobs(blobs) => {} + AvailabilityProcessingStatus::PendingBlock(_) => { + // doesn't makes sense + } + } + } Err(error) => { return ChainSegmentResult::Failed { imported_blocks, diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 48ad45e83b..c682517e49 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -12,6 +12,7 @@ use crate::data_availability_checker::{ use crate::kzg_utils::{validate_blob, validate_blobs}; use crate::BeaconChainError; use kzg::Kzg; +use types::blob_sidecar::BlobIdentifier; use types::{ BeaconBlockRef, BeaconStateError, BlobSidecar, BlobSidecarList, Epoch, EthSpec, Hash256, KzgCommitment, SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, @@ -136,9 +137,15 @@ pub struct GossipVerifiedBlob { } impl GossipVerifiedBlob { + pub fn id(&self) -> BlobIdentifier { + self.blob.id() + } pub fn block_root(&self) -> Hash256 { self.blob.block_root } + pub fn to_blob(self) -> Arc> { + self.blob + } } pub fn validate_blob_sidecar_for_gossip( @@ -287,19 +294,14 @@ impl KzgVerifiedBlob { } pub fn verify_kzg_for_blob( - blob: GossipVerifiedBlob, + blob: Arc>, kzg: &Kzg, ) -> Result, AvailabilityCheckError> { //TODO(sean) remove clone - if validate_blob::( - kzg, - blob.blob.blob.clone(), - blob.blob.kzg_commitment, - blob.blob.kzg_proof, - ) - .map_err(AvailabilityCheckError::Kzg)? + if validate_blob::(kzg, blob.blob.clone(), blob.kzg_commitment, blob.kzg_proof) + .map_err(AvailabilityCheckError::Kzg)? { - Ok(KzgVerifiedBlob { blob: blob.blob }) + Ok(KzgVerifiedBlob { blob }) } else { Err(AvailabilityCheckError::KzgVerificationFailed) } @@ -449,6 +451,15 @@ pub enum BlockWrapper { BlockAndBlobs(Arc>, Vec>>), } +impl BlockWrapper { + pub fn deconstruct(self) -> (Arc>, Option>>>) { + match self { + BlockWrapper::Block(block) => (block, None), + BlockWrapper::BlockAndBlobs(block, blobs) => (block, Some(blobs)), + } + } +} + impl AsBlock for BlockWrapper { fn slot(&self) -> Slot { self.as_block().slot() diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 0cf26f12f5..95bc3ed87a 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -63,6 +63,7 @@ pub struct DataAvailabilityChecker { struct GossipBlobCache { verified_blobs: Vec>, executed_block: Option>, + missing_blob_ids: Vec, } impl GossipBlobCache { @@ -70,13 +71,16 @@ impl GossipBlobCache { Self { verified_blobs: vec![blob], executed_block: None, + missing_blob_ids: vec![], } } fn new_from_block(block: AvailabilityPendingExecutedBlock) -> Self { + let missing_blob_ids = block.get_all_blob_ids(); Self { verified_blobs: vec![], executed_block: Some(block), + missing_blob_ids, } } @@ -117,6 +121,22 @@ impl DataAvailabilityChecker { self.rpc_blob_cache.read().get(blob_id).cloned() } + pub fn put_rpc_blob( + &self, + blob: Arc>, + ) -> Result, AvailabilityCheckError> { + // Verify the KZG commitment. + let kzg_verified_blob = if let Some(kzg) = self.kzg.as_ref() { + verify_kzg_for_blob(blob, kzg)? + } else { + return Err(AvailabilityCheckError::KzgNotInitialized); + }; + + self.put_kzg_verified_blob(kzg_verified_blob, |blob_id, missing_blob_ids| { + missing_blob_ids.contains(&blob_id) + }) + } + /// This first validate the KZG commitments included in the blob sidecar. /// Check if we've cached other blobs for this block. If it completes a set and we also /// have a block cached, return the Availability variant triggering block import. @@ -127,17 +147,23 @@ impl DataAvailabilityChecker { &self, gossip_blob: GossipVerifiedBlob, ) -> Result, AvailabilityCheckError> { - let block_root = gossip_blob.block_root(); - // Verify the KZG commitments. let kzg_verified_blob = if let Some(kzg) = self.kzg.as_ref() { - verify_kzg_for_blob(gossip_blob, kzg)? + verify_kzg_for_blob(gossip_blob.to_blob(), kzg)? } else { return Err(AvailabilityCheckError::KzgNotInitialized); }; - let blob = kzg_verified_blob.clone_blob(); + self.put_kzg_verified_blob(kzg_verified_blob, |_, _| true) + } + fn put_kzg_verified_blob( + &self, + kzg_verified_blob: KzgVerifiedBlob, + predicate: impl FnOnce(BlobIdentifier, &[BlobIdentifier]) -> bool, + ) -> Result, AvailabilityCheckError> { + let blob = kzg_verified_blob.clone_blob(); + let blob_id = blob.id(); let mut blob_cache = self.gossip_blob_cache.lock(); // Gossip cache. @@ -147,6 +173,10 @@ impl DataAvailabilityChecker { // should filter duplicates, as well as validate indices. let cache = occupied_entry.get_mut(); + if !predicate(blob_id, cache.missing_blob_ids.as_slice()) { + // ignore this blob + } + cache .verified_blobs .insert(blob.index as usize, kzg_verified_blob); @@ -154,7 +184,7 @@ impl DataAvailabilityChecker { if let Some(executed_block) = cache.executed_block.take() { self.check_block_availability_maybe_cache(occupied_entry, executed_block)? } else { - Availability::PendingBlock(block_root) + Availability::PendingBlock(blob.block_root) } } Entry::Vacant(vacant_entry) => { @@ -169,9 +199,8 @@ impl DataAvailabilityChecker { if let Some(blob_ids) = availability.get_available_blob_ids() { self.prune_rpc_blob_cache(&blob_ids); } else { - self.rpc_blob_cache.write().insert(blob.id(), blob.clone()); + self.rpc_blob_cache.write().insert(blob_id, blob); } - Ok(availability) } @@ -233,6 +262,7 @@ impl DataAvailabilityChecker { .get_filtered_blob_ids(|index| cache.verified_blobs.get(index).is_none()); let _ = cache.executed_block.insert(executed_block); + cache.missing_blob_ids = missing_blob_ids.clone(); Ok(Availability::PendingBlobs(missing_blob_ids)) } diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index c26fe75727..caa358904b 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -64,10 +64,10 @@ use std::{cmp, collections::HashSet}; use task_executor::TaskExecutor; use tokio::sync::mpsc; use types::{ - Attestation, AttesterSlashing, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate, - ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobSidecar, - SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, - SyncCommitteeMessage, SyncSubnetId, + Attestation, AttesterSlashing, BlobSidecar, Hash256, LightClientFinalityUpdate, + LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, + SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, + SubnetId, SyncCommitteeMessage, SyncSubnetId, }; use work_reprocessing_queue::{ spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, @@ -119,7 +119,7 @@ const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `SignedBeaconBlockAndBlobsSidecar` objects received on gossip that /// will be stored before we start dropping them. -const MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN: usize = 1_024; +const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but /// within acceptable clock disparity) that will be queued before we start dropping them. @@ -160,6 +160,7 @@ const MAX_SYNC_CONTRIBUTION_QUEUE_LEN: usize = 1024; /// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that /// will be stored before we start dropping them. const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; +const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024 * 4; // TODO(sean) make function of max blobs per block? or is this just too big? /// The maximum number of queued `Vec` objects received during syncing that will /// be stored before we start dropping them. @@ -229,6 +230,7 @@ pub const GOSSIP_SYNC_CONTRIBUTION: &str = "gossip_sync_contribution"; pub const GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update"; pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; pub const RPC_BLOCK: &str = "rpc_block"; +pub const RPC_BLOB: &str = "rpc_blob"; pub const CHAIN_SEGMENT: &str = "chain_segment"; pub const STATUS_PROCESSING: &str = "status_processing"; pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request"; @@ -607,7 +609,7 @@ impl WorkEvent { /// sent to the other side of `result_tx`. pub fn rpc_beacon_block( block_root: Hash256, - block: BlockWrapper, + block: Arc>, seen_timestamp: Duration, process_type: BlockProcessType, ) -> Self { @@ -623,6 +625,21 @@ impl WorkEvent { } } + pub fn rpc_blob( + blob: Arc>, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::RpcBlob { + block: blob, + seen_timestamp, + process_type, + }, + } + } + /// Create a new work event to import `blocks` as a beacon chain segment. pub fn chain_segment( process_id: ChainSegmentProcessId, @@ -914,11 +931,16 @@ pub enum Work { }, RpcBlock { block_root: Hash256, - block: BlockWrapper, + block: Arc>, seen_timestamp: Duration, process_type: BlockProcessType, should_process: bool, }, + RpcBlob { + block: Arc>, + seen_timestamp: Duration, + process_type: BlockProcessType, + }, ChainSegment { process_id: ChainSegmentProcessId, blocks: Vec>, @@ -978,6 +1000,7 @@ impl Work { Work::GossipLightClientFinalityUpdate { .. } => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE, Work::GossipLightClientOptimisticUpdate { .. } => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE, Work::RpcBlock { .. } => RPC_BLOCK, + Work::RpcBlob { .. } => RPC_BLOB, Work::ChainSegment { .. } => CHAIN_SEGMENT, Work::Status { .. } => STATUS_PROCESSING, Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST, @@ -1128,11 +1151,11 @@ impl BeaconProcessor { // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); + let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN); let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); - let mut gossip_block_and_blobs_sidecar_queue = - FifoQueue::new(MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN); + let mut gossip_blob_queue = FifoQueue::new(MAX_GOSSIP_BLOB_QUEUE_LEN); let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN); let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); @@ -1239,6 +1262,8 @@ impl BeaconProcessor { // evolves. } else if let Some(item) = rpc_block_queue.pop() { self.spawn_worker(item, toolbox); + } else if let Some(item) = rpc_blob_queue.pop() { + self.spawn_worker(item, toolbox); // Check delayed blocks before gossip blocks, the gossip blocks might rely // on the delayed ones. } else if let Some(item) = delayed_block_queue.pop() { @@ -1247,7 +1272,7 @@ impl BeaconProcessor { // required to verify some attestations. } else if let Some(item) = gossip_block_queue.pop() { self.spawn_worker(item, toolbox); - } else if let Some(item) = gossip_block_and_blobs_sidecar_queue.pop() { + } else if let Some(item) = gossip_blob_queue.pop() { self.spawn_worker(item, toolbox); // Check the aggregates, *then* the unaggregates since we assume that // aggregates are more valuable to local validators and effectively give us @@ -1463,7 +1488,7 @@ impl BeaconProcessor { gossip_block_queue.push(work, work_id, &self.log) } Work::GossipSignedBlobSidecar { .. } => { - gossip_block_and_blobs_sidecar_queue.push(work, work_id, &self.log) + gossip_blob_queue.push(work, work_id, &self.log) } Work::DelayedImportBlock { .. } => { delayed_block_queue.push(work, work_id, &self.log) @@ -1488,6 +1513,7 @@ impl BeaconProcessor { optimistic_update_queue.push(work, work_id, &self.log) } Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log), + Work::RpcBlob { .. } => rpc_blob_queue.push(work, work_id, &self.log), Work::ChainSegment { ref process_id, .. } => match process_id { ChainSegmentProcessId::RangeBatchId { .. } | ChainSegmentProcessId::ParentLookup { .. } => { @@ -1557,6 +1583,10 @@ impl BeaconProcessor { &metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL, rpc_block_queue.len() as i64, ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL, + rpc_blob_queue.len() as i64, + ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL, chain_segment_queue.len() as i64, @@ -1906,6 +1936,15 @@ impl BeaconProcessor { duplicate_cache, should_process, )), + Work::RpcBlob { + block, + seen_timestamp, + process_type, + } => task_spawner.spawn_async(worker.process_rpc_blob( + block, + seen_timestamp, + process_type, + )), /* * Verification for a chain segment (multiple blocks). */ diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 4a56530799..96d75f9d03 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -14,7 +14,6 @@ use super::MAX_SCHEDULED_WORK_QUEUE_LEN; use crate::metrics; use crate::sync::manager::BlockProcessType; use beacon_chain::blob_verification::AsBlock; -use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; use fnv::FnvHashMap; use futures::task::Poll; @@ -25,13 +24,15 @@ use slog::{debug, error, trace, warn, Logger}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; use types::{ - Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, SubnetId, + Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, + SignedBeaconBlock, SubnetId, }; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; @@ -135,7 +136,7 @@ pub struct QueuedGossipBlock { /// It is queued for later import. pub struct QueuedRpcBlock { pub block_root: Hash256, - pub block: BlockWrapper, + pub block: Arc>, pub process_type: BlockProcessType, pub seen_timestamp: Duration, /// Indicates if the beacon chain should process this block or not. diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index e718a01d64..264d02b9b1 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -18,7 +18,7 @@ use lighthouse_network::PeerAction; use slog::{debug, error, info, warn}; use std::sync::Arc; use tokio::sync::mpsc; -use types::{Epoch, Hash256, SignedBeaconBlock}; +use types::{BlobSidecar, Epoch, Hash256, SignedBeaconBlock}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -45,7 +45,7 @@ impl Worker { pub async fn process_rpc_block( self, block_root: Hash256, - block: BlockWrapper, + block: Arc>, seen_timestamp: Duration, process_type: BlockProcessType, reprocess_tx: mpsc::Sender>, @@ -136,6 +136,27 @@ impl Worker { drop(handle); } + pub async fn process_rpc_blob( + self, + blob: Arc>, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) { + let result = self + .chain + .check_availability_and_maybe_import( + |chain| chain.data_availability_checker.put_rpc_blob(blob), + CountUnrealized::True, + ) + .await; + + // Sync handles these results + self.send_sync_message(SyncMessage::BlobProcessed { + process_type, + result, + }); + } + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. pub async fn process_chain_segment( diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index fed799988b..b4fe8514d0 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -451,8 +451,8 @@ impl Router { } id @ (SyncId::BackFillBlocks { .. } | SyncId::RangeBlocks { .. } - | SyncId::BackFillBlobs { .. } - | SyncId::RangeBlobs { .. }) => id, + | SyncId::BackFillBlockAndBlobs { .. } + | SyncId::RangeBlockAndBlobs { .. }) => id, }, RequestId::Router => unreachable!("All BBRange requests belong to sync"), }; @@ -510,8 +510,8 @@ impl Router { id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, SyncId::BackFillBlocks { .. } | SyncId::RangeBlocks { .. } - | SyncId::RangeBlobs { .. } - | SyncId::BackFillBlobs { .. } => { + | SyncId::RangeBlockAndBlobs { .. } + | SyncId::BackFillBlockAndBlobs { .. } => { unreachable!("Batch syncing do not request BBRoot requests") } }, @@ -543,8 +543,8 @@ impl Router { id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, SyncId::BackFillBlocks { .. } | SyncId::RangeBlocks { .. } - | SyncId::RangeBlobs { .. } - | SyncId::BackFillBlobs { .. } => { + | SyncId::RangeBlockAndBlobs { .. } + | SyncId::BackFillBlockAndBlobs { .. } => { unreachable!("Batch syncing does not request BBRoot requests") } }, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index c51ca1ff92..47e964a7c3 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,11 +1,12 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::sync::Arc; use std::thread::sleep; use std::time::Duration; use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; -use beacon_chain::{BeaconChainTypes, BlockError}; +use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use fnv::FnvHashMap; use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; use lighthouse_network::{PeerAction, PeerId}; @@ -14,6 +15,7 @@ use slog::{debug, error, trace, warn, Logger}; use smallvec::SmallVec; use store::Hash256; use types::blob_sidecar::BlobIdentifier; +use types::{BlobSidecar, SignedBeaconBlock}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; @@ -36,7 +38,7 @@ mod single_block_lookup; #[cfg(test)] mod tests; -pub type RootBlockTuple = (Hash256, BlockWrapper); +pub type RootBlockTuple = (Hash256, Arc>); const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; @@ -145,6 +147,7 @@ impl BlockLookups { cx: &mut SyncNetworkContext, ) { //TODO(sean) handle delay + //TODO(sean) cannot use peer id here cause it assumes it has the block, this is from gossip so not true self.search_block(hash, peer_id, cx); } @@ -206,7 +209,7 @@ impl BlockLookups { &mut self, id: Id, peer_id: PeerId, - block: Option>, + block: Option>>, seen_timestamp: Duration, cx: &mut SyncNetworkContext, ) { @@ -271,7 +274,7 @@ impl BlockLookups { &mut self, id: Id, peer_id: PeerId, - block: Option>, + block: Option>>, seen_timestamp: Duration, cx: &mut SyncNetworkContext, ) { @@ -349,6 +352,28 @@ impl BlockLookups { ); } + pub fn single_lookup_blob_response( + &mut self, + id: Id, + peer_id: PeerId, + block: Option>>, + seen_timestamp: Duration, + cx: &mut SyncNetworkContext, + ) { + todo!() + } + + pub fn parent_lookup_blob_response( + &mut self, + id: Id, + peer_id: PeerId, + block: Option>>, + seen_timestamp: Duration, + cx: &mut SyncNetworkContext, + ) { + todo!() + } + /* Error responses */ #[allow(clippy::needless_collect)] // false positive @@ -472,11 +497,18 @@ impl BlockLookups { }; match result { - BlockProcessResult::Ok => { - trace!(self.log, "Single block processing succeeded"; "block" => %root); - } - BlockProcessResult::MissingBlobs(blobs) => { - todo!() + BlockProcessResult::Ok(status) => { + match status { + AvailabilityProcessingStatus::Imported(hash) => { + trace!(self.log, "Single block processing succeeded"; "block" => %root); + } + AvailabilityProcessingStatus::PendingBlobs(blobs) => { + // trigger? + } + AvailabilityProcessingStatus::PendingBlock(hash) => { + // logic error + } + } } BlockProcessResult::Ignored => { // Beacon processor signalled to ignore the block processing result. @@ -558,11 +590,18 @@ impl BlockLookups { }; match &result { - BlockProcessResult::Ok => { - trace!(self.log, "Parent block processing succeeded"; &parent_lookup) - } - BlockProcessResult::MissingBlobs(blobs) => { - todo!() + BlockProcessResult::Ok(status) => { + match status { + AvailabilityProcessingStatus::Imported(hash) => { + trace!(self.log, "Parent block processing succeeded"; &parent_lookup) + } + AvailabilityProcessingStatus::PendingBlobs(blobs) => { + // trigger? + } + AvailabilityProcessingStatus::PendingBlock(hash) => { + // logic error + } + } } BlockProcessResult::Err(e) => { trace!(self.log, "Parent block processing failed"; &parent_lookup, "error" => %e) @@ -578,8 +617,11 @@ impl BlockLookups { } match result { - BlockProcessResult::MissingBlobs(blobs) => { - todo!() + BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlock(_)) => { + // doesn't make sense + } + BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlobs(blobs)) => { + // trigger } BlockProcessResult::Err(BlockError::ParentUnknown(block)) => { // need to keep looking for parents @@ -587,7 +629,7 @@ impl BlockLookups { parent_lookup.add_block(block); self.request_parent(parent_lookup, cx); } - BlockProcessResult::Ok + BlockProcessResult::Ok(AvailabilityProcessingStatus::Imported(_)) | BlockProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => { // Check if the beacon processor is available let beacon_processor_send = match cx.processor_channel_if_enabled() { @@ -666,6 +708,24 @@ impl BlockLookups { ); } + pub fn single_blob_processed( + &mut self, + id: Id, + result: BlockProcessResult, + cx: &mut SyncNetworkContext, + ) { + todo!() + } + + pub fn parent_blob_processed( + &mut self, + chain_hash: Hash256, + result: BlockProcessResult, + cx: &mut SyncNetworkContext, + ) { + todo!() + } + pub fn parent_chain_processed( &mut self, chain_hash: Hash256, @@ -709,7 +769,7 @@ impl BlockLookups { fn send_block_for_processing( &mut self, block_root: Hash256, - block: BlockWrapper, + block: Arc>, duration: Duration, process_type: BlockProcessType, cx: &mut SyncNetworkContext, diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index cbb5356800..5d669ae1e4 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -7,8 +7,10 @@ use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::BeaconChainTypes; use lighthouse_network::PeerId; +use std::sync::Arc; use store::Hash256; use strum::IntoStaticStr; +use types::{BlobSidecar, SignedBeaconBlock}; use super::single_block_lookup::{self, SingleBlockRequest}; @@ -25,6 +27,7 @@ pub(crate) struct ParentLookup { chain_hash: Hash256, /// The blocks that have currently been downloaded. downloaded_blocks: Vec>, + downloaded_blobs: Vec>>>>, /// Request of the last parent. current_parent_request: SingleBlockRequest, /// Id of the last parent request. @@ -59,12 +62,18 @@ impl ParentLookup { .any(|(root, _d_block)| root == block_root) } - pub fn new(block_root: Hash256, block: BlockWrapper, peer_id: PeerId) -> Self { + pub fn new( + block_root: Hash256, + block_wrapper: BlockWrapper, + peer_id: PeerId, + ) -> Self { + let (block, blobs) = block_wrapper.deconstruct(); let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id); Self { chain_hash: block_root, downloaded_blocks: vec![(block_root, block)], + downloaded_blobs: vec![blobs], current_parent_request, current_parent_request_id: None, } @@ -94,10 +103,12 @@ impl ParentLookup { self.current_parent_request.check_peer_disconnected(peer_id) } - pub fn add_block(&mut self, block: BlockWrapper) { - let next_parent = block.parent_root(); + pub fn add_block(&mut self, block_wrapper: BlockWrapper) { + let next_parent = block_wrapper.parent_root(); let current_root = self.current_parent_request.hash; + let (block, blobs) = block_wrapper.deconstruct(); self.downloaded_blocks.push((current_root, block)); + self.downloaded_blobs.push(blobs); self.current_parent_request.hash = next_parent; self.current_parent_request.state = single_block_lookup::State::AwaitingDownload; self.current_parent_request_id = None; @@ -120,14 +131,23 @@ impl ParentLookup { let ParentLookup { chain_hash, downloaded_blocks, + downloaded_blobs, current_parent_request, current_parent_request_id: _, } = self; let block_count = downloaded_blocks.len(); let mut blocks = Vec::with_capacity(block_count); let mut hashes = Vec::with_capacity(block_count); - for (hash, block) in downloaded_blocks { - blocks.push(block); + for ((hash, block), blobs) in downloaded_blocks + .into_iter() + .zip(downloaded_blobs.into_iter()) + { + let wrapped_block = if let Some(blobs) = blobs { + BlockWrapper::BlockAndBlobs(block, blobs) + } else { + BlockWrapper::Block(block) + }; + blocks.push(wrapped_block); hashes.push(hash); } (chain_hash, blocks, hashes, current_parent_request) @@ -152,7 +172,7 @@ impl ParentLookup { /// the processing result of the block. pub fn verify_block( &mut self, - block: Option>, + block: Option>>, failed_chains: &mut lru_cache::LRUTimeCache, ) -> Result>, VerifyError> { let root_and_block = self.current_parent_request.verify_block(block)?; diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 60911dbb39..3698da33c1 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -6,10 +6,15 @@ use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; use rand::seq::IteratorRandom; use ssz_types::VariableList; use std::collections::HashSet; +use std::sync::Arc; use store::{EthSpec, Hash256}; use strum::IntoStaticStr; +use types::blob_sidecar::BlobIdentifier; +use types::SignedBeaconBlock; /// Object representing a single block lookup request. +/// +//previously assumed we would have a single block. Now we may have the block but not the blobs #[derive(PartialEq, Eq)] pub struct SingleBlockRequest { /// The hash of the requested block. @@ -24,6 +29,7 @@ pub struct SingleBlockRequest { failed_processing: u8, /// How many times have we attempted to download this block. failed_downloading: u8, + missing_blobs: Vec, } #[derive(Debug, PartialEq, Eq)] @@ -59,6 +65,7 @@ impl SingleBlockRequest { used_peers: HashSet::default(), failed_processing: 0, failed_downloading: 0, + missing_blobs: vec![], } } @@ -105,7 +112,7 @@ impl SingleBlockRequest { /// Returns the block for processing if the response is what we expected. pub fn verify_block( &mut self, - block: Option>, + block: Option>>, ) -> Result>, VerifyError> { match self.state { State::AwaitingDownload => { @@ -116,7 +123,7 @@ impl SingleBlockRequest { Some(block) => { // Compute the block root using this specific function so that we can get timing // metrics. - let block_root = get_block_root(block.as_block()); + let block_root = get_block_root(&block); if block_root != self.hash { // return an error and drop the block // NOTE: we take this is as a download failure to prevent counting the diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 29d2c2937d..70bd419413 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -83,11 +83,11 @@ pub enum RequestId { /// Request was from the backfill sync algorithm. BackFillBlocks { id: Id }, /// Backfill request that is composed by both a block range request and a blob range request. - BackFillBlobs { id: Id }, + BackFillBlockAndBlobs { id: Id }, /// The request was from a chain in the range sync algorithm. RangeBlocks { id: Id }, /// Range request that is composed by both a block range request and a blob range request. - RangeBlobs { id: Id }, + RangeBlockAndBlobs { id: Id }, } // TODO(diva) I'm updating functions what at a time, but this should be revisited because I think @@ -157,6 +157,12 @@ pub enum SyncMessage { process_type: BlockProcessType, result: BlockProcessResult, }, + + /// Block processed + BlobProcessed { + process_type: BlockProcessType, + result: Result>, + }, } /// The type of processing specified for a received block. @@ -168,8 +174,7 @@ pub enum BlockProcessType { #[derive(Debug)] pub enum BlockProcessResult { - Ok, - MissingBlobs(Vec), + Ok(AvailabilityProcessingStatus), Err(BlockError), Ignored, } @@ -322,7 +327,7 @@ impl SyncManager { } } - RequestId::BackFillBlobs { id } => { + RequestId::BackFillBlockAndBlobs { id } => { if let Some(batch_id) = self .network .backfill_request_failed(id, ByRangeRequestType::BlocksAndBlobs) @@ -351,7 +356,7 @@ impl SyncManager { self.update_sync_state() } } - RequestId::RangeBlobs { id } => { + RequestId::RangeBlockAndBlobs { id } => { if let Some((chain_id, batch_id)) = self .network .range_sync_request_failed(id, ByRangeRequestType::BlocksAndBlobs) @@ -576,24 +581,14 @@ impl SyncManager { beacon_block, seen_timestamp, } => { - self.rpc_block_or_blob_received( - request_id, - peer_id, - beacon_block.into(), - seen_timestamp, - ); + self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp); } SyncMessage::RpcBlob { request_id, peer_id, blob_sidecar, seen_timestamp, - } => self.rpc_block_or_blob_received( - request_id, - peer_id, - blob_sidecar.into(), - seen_timestamp, - ), + } => self.rpc_blob_received(request_id, peer_id, blob_sidecar, seen_timestamp), SyncMessage::UnknownBlock(peer_id, block, block_root) => { // If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore if !self.network_globals.sync_state.read().is_synced() { @@ -670,6 +665,18 @@ impl SyncManager { .block_lookups .parent_block_processed(chain_hash, result, &mut self.network), }, + SyncMessage::BlobProcessed { + process_type, + result, + } => match process_type { + BlockProcessType::SingleBlock { id } => { + self.block_lookups + .single_blob_processed(id, result.into(), &mut self.network) + } + BlockProcessType::ParentLookup { chain_hash } => self + .block_lookups + .parent_blob_processed(chain_hash, result.into(), &mut self.network), + }, SyncMessage::BatchProcessed { sync_type, result } => match sync_type { ChainSegmentProcessId::RangeBatchId(chain_id, epoch, _) => { self.range_sync.handle_block_process_result( @@ -763,50 +770,30 @@ impl SyncManager { } } - fn rpc_block_or_blob_received( + fn rpc_block_received( &mut self, request_id: RequestId, peer_id: PeerId, - block_or_blob: BlockOrBlob, + block: Option>>, seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { id } => { - // TODO(diva) adjust when dealing with by root requests. This code is here to - // satisfy dead code analysis - match block_or_blob { - BlockOrBlob::Block(maybe_block) => { - self.block_lookups.single_block_lookup_response( - id, - peer_id, - maybe_block.map(BlockWrapper::Block), - seen_timestamp, - &mut self.network, - ) - } - BlockOrBlob::Sidecar(_) => unimplemented!("Mimatch between BlockWrapper and what the network receives needs to be handled first."), - } - } - RequestId::ParentLookup { id } => { - // TODO(diva) adjust when dealing with by root requests. This code is here to - // satisfy dead code analysis - match block_or_blob { - BlockOrBlob::Block(maybe_block) => self.block_lookups.parent_lookup_response( - id, - peer_id, - maybe_block.map(BlockWrapper::Block), - seen_timestamp, - &mut self.network, - ), - BlockOrBlob::Sidecar(_) => unimplemented!("Mimatch between BlockWrapper and what the network receives needs to be handled first."), - } - } + RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( + id, + peer_id, + block, + seen_timestamp, + &mut self.network, + ), + RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( + id, + peer_id, + block, + seen_timestamp, + &mut self.network, + ), RequestId::BackFillBlocks { id } => { - let maybe_block = match block_or_blob { - BlockOrBlob::Block(maybe_block) => maybe_block, - BlockOrBlob::Sidecar(_) => todo!("I think this is unreachable"), - }; - let is_stream_terminator = maybe_block.is_none(); + let is_stream_terminator = block.is_none(); if let Some(batch_id) = self .network .backfill_sync_only_blocks_response(id, is_stream_terminator) @@ -816,7 +803,7 @@ impl SyncManager { batch_id, &peer_id, id, - maybe_block.map(|block| block.into()), + block.map(BlockWrapper::Block), ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), Ok(ProcessResult::Successful) => {} @@ -829,14 +816,10 @@ impl SyncManager { } } RequestId::RangeBlocks { id } => { - let maybe_block = match block_or_blob { - BlockOrBlob::Block(maybe_block) => maybe_block, - BlockOrBlob::Sidecar(_) => todo!("I think this should be unreachable, since this is a range only-blocks request, and the network should not accept this chunk at all. Needs better handling"), - }; - let is_stream_terminator = maybe_block.is_none(); + let is_stream_terminator = block.is_none(); if let Some((chain_id, batch_id)) = self .network - .range_sync_block_response(id, is_stream_terminator) + .range_sync_block_only_response(id, is_stream_terminator) { self.range_sync.blocks_by_range_response( &mut self.network, @@ -844,17 +827,53 @@ impl SyncManager { chain_id, batch_id, id, - maybe_block.map(|block| block.into()), + block.map(BlockWrapper::Block), ); self.update_sync_state(); } } - - RequestId::BackFillBlobs { id } => { - self.backfill_block_and_blobs_response(id, peer_id, block_or_blob) + RequestId::BackFillBlockAndBlobs { id } => { + self.backfill_block_and_blobs_response(id, peer_id, block.into()) } - RequestId::RangeBlobs { id } => { - self.range_block_and_blobs_response(id, peer_id, block_or_blob) + RequestId::RangeBlockAndBlobs { id } => { + self.range_block_and_blobs_response(id, peer_id, block.into()) + } + } + } + + fn rpc_blob_received( + &mut self, + request_id: RequestId, + peer_id: PeerId, + blob: Option>>, + seen_timestamp: Duration, + ) { + match request_id { + RequestId::SingleBlock { id } => self.block_lookups.single_lookup_blob_response( + id, + peer_id, + blob, + seen_timestamp, + &mut self.network, + ), + RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_blob_response( + id, + peer_id, + blob, + seen_timestamp, + &mut self.network, + ), + RequestId::BackFillBlocks { id } => { + todo!() + } + RequestId::RangeBlocks { id } => { + todo!() + } + RequestId::BackFillBlockAndBlobs { id } => { + self.backfill_block_and_blobs_response(id, peer_id, blob.into()) + } + RequestId::RangeBlockAndBlobs { id } => { + self.range_block_and_blobs_response(id, peer_id, blob.into()) } } } @@ -898,7 +917,7 @@ impl SyncManager { "peer_id" => %peer_id, "batch_id" => resp.batch_id, "error" => e ); // TODO: penalize the peer for being a bad boy - let id = RequestId::RangeBlobs { id }; + let id = RequestId::RangeBlockAndBlobs { id }; self.inject_error(peer_id, id, RPCError::InvalidData(e.into())) } } @@ -950,7 +969,7 @@ impl SyncManager { "peer_id" => %peer_id, "batch_id" => resp.batch_id, "error" => e ); // TODO: penalize the peer for being a bad boy - let id = RequestId::BackFillBlobs { id }; + let id = RequestId::BackFillBlockAndBlobs { id }; self.inject_error(peer_id, id, RPCError::InvalidData(e.into())) } } @@ -963,14 +982,8 @@ impl From>> { fn from(result: Result>) -> Self { match result { - Ok(AvailabilityProcessingStatus::Imported(_)) => BlockProcessResult::Ok, - Ok(AvailabilityProcessingStatus::PendingBlock(_)) => { - todo!() // doesn't make sense - } - Ok(AvailabilityProcessingStatus::PendingBlobs(blobs)) => { - BlockProcessResult::MissingBlobs(blobs) - } - Err(e) => e.into(), + Ok(status) => BlockProcessResult::Ok(status), + Err(e) => BlockProcessResult::Err(e), } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 6cdd7e5c0e..d6bab8729a 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -186,7 +186,7 @@ impl SyncNetworkContext { // create the shared request id. This is fine since the rpc handles substream ids. let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::RangeBlobs { id }); + let request_id = RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }); // Create the blob request based on the blob request. let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { @@ -259,7 +259,7 @@ impl SyncNetworkContext { // create the shared request id. This is fine since the rpc handles substream ids. let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::BackFillBlobs { id }); + let request_id = RequestId::Sync(SyncRequestId::BackFillBlockAndBlobs { id }); // Create the blob request based on the blob request. let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { @@ -288,7 +288,7 @@ impl SyncNetworkContext { } /// Response for a request that is only for blocks. - pub fn range_sync_block_response( + pub fn range_sync_block_only_response( &mut self, request_id: Id, is_stream_terminator: bool, diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index b53fa872dd..6a620b695d 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -685,7 +685,7 @@ mod tests { range.add_peer(&mut rig.cx, local_info, peer1, head_info); let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 { RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => { - (rig.cx.range_sync_block_response(id, true).unwrap(), id) + (rig.cx.range_sync_block_only_response(id, true).unwrap(), id) } other => panic!("unexpected request {:?}", other), }; @@ -704,7 +704,7 @@ mod tests { range.add_peer(&mut rig.cx, local_info, peer2, finalized_info); let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 { RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => { - (rig.cx.range_sync_block_response(id, true).unwrap(), id) + (rig.cx.range_sync_block_only_response(id, true).unwrap(), id) } other => panic!("unexpected request {:?}", other), }; diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index ce6d7e0e61..de2c5e487f 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -11,7 +11,9 @@ use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; /// Container of the data that identifies an individual blob. -#[derive(Serialize, Deserialize, Encode, Decode, TreeHash, Clone, Debug, PartialEq, Eq, Hash)] +#[derive( + Serialize, Deserialize, Encode, Decode, TreeHash, Copy, Clone, Debug, PartialEq, Eq, Hash, +)] pub struct BlobIdentifier { pub block_root: Hash256, pub index: u64,