From bacec52017794e9065f2deb6cd899a002859d3ff Mon Sep 17 00:00:00 2001 From: realbigsean Date: Thu, 20 Apr 2023 19:42:33 -0400 Subject: [PATCH] parent blob lookups --- beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- .../beacon_chain/src/blob_verification.rs | 30 +++---- .../beacon_chain/src/block_verification.rs | 6 +- .../beacon_processor/worker/gossip_methods.rs | 37 +++++--- .../network/src/sync/block_lookups/mod.rs | 18 ++++ .../sync/block_lookups/single_block_lookup.rs | 30 +++++++ beacon_node/network/src/sync/manager.rs | 87 +++++++++++++++---- 7 files changed, 158 insertions(+), 52 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a151db70b9..9d7ce3dcda 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1981,7 +1981,7 @@ impl BeaconChain { self: &Arc, blob_sidecar: SignedBlobSidecar, subnet_id: u64, - ) -> Result, BlobError> { + ) -> Result, BlobError> { blob_verification::validate_blob_sidecar_for_gossip(blob_sidecar, subnet_id, self) } diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 358863b253..44c0879570 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -14,16 +14,16 @@ use crate::data_availability_checker::{ use crate::kzg_utils::{validate_blob, validate_blobs}; use crate::BeaconChainError; use kzg::Kzg; -use types::blob_sidecar::BlobIdentifier; use std::borrow::Cow; +use types::blob_sidecar::BlobIdentifier; use types::{ - BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecar, ChainSpec, - CloneConfig, Epoch, EthSpec, Hash256, KzgCommitment, RelativeEpoch, SignedBeaconBlock, - SignedBeaconBlockHeader, SignedBlobSidecar, Slot, + BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecar, ChainSpec, CloneConfig, Epoch, + EthSpec, Hash256, KzgCommitment, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, + SignedBlobSidecar, Slot, }; #[derive(Debug)] -pub enum BlobError { +pub enum BlobError { /// The blob sidecar is from a slot that is later than the current slot (with respect to the /// gossip clock disparity). /// @@ -95,10 +95,7 @@ pub enum BlobError { /// ## Peer scoring /// /// We cannot process the blob without validating its parent, the peer isn't necessarily faulty. - BlobParentUnknown { - blob_root: Hash256, - blob_parent_root: Hash256, - }, + BlobParentUnknown(Arc>), /// A blob has already been seen for the given `(sidecar.block_root, sidecar.index)` tuple /// over gossip or no gossip sources. @@ -113,13 +110,13 @@ pub enum BlobError { }, } -impl From for BlobError { +impl From for BlobError { fn from(e: BeaconChainError) -> Self { BlobError::BeaconChainError(e) } } -impl From for BlobError { +impl From for BlobError { fn from(e: BeaconStateError) -> Self { BlobError::BeaconChainError(BeaconChainError::BeaconStateError(e)) } @@ -127,7 +124,7 @@ impl From for BlobError { /// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on /// the p2p network. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct GossipVerifiedBlob { blob: Arc>, } @@ -151,7 +148,7 @@ pub fn validate_blob_sidecar_for_gossip( signed_blob_sidecar: SignedBlobSidecar, subnet: u64, chain: &BeaconChain, -) -> Result, BlobError> { +) -> Result, BlobError> { let blob_slot = signed_blob_sidecar.message.slot; let blob_index = signed_blob_sidecar.message.index; let block_root = signed_blob_sidecar.message.block_root; @@ -219,10 +216,7 @@ pub fn validate_blob_sidecar_for_gossip( }); } } else { - return Err(BlobError::BlobParentUnknown { - blob_root: block_root, - blob_parent_root: block_parent_root, - }); + return Err(BlobError::BlobParentUnknown(signed_blob_sidecar.message)); } // Note: The spec checks the signature directly against `blob_sidecar.message.proposer_index` @@ -359,7 +353,7 @@ fn cheap_state_advance_to_obtain_committees<'a, E: EthSpec>( state_root_opt: Option, blob_slot: Slot, spec: &ChainSpec, -) -> Result>, BlobError> { +) -> Result>, BlobError> { let block_epoch = blob_slot.epoch(E::slots_per_epoch()); if state.current_epoch() == block_epoch { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 39110b7a7c..8c6be1f5da 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -310,13 +310,13 @@ pub enum BlockError { ParentExecutionPayloadInvalid { parent_root: Hash256, }, - BlobValidation(BlobError), + BlobValidation(BlobError), AvailabilityCheck(AvailabilityCheckError), MissingBlockParts(Slot, Hash256), } -impl From for BlockError { - fn from(e: BlobError) -> Self { +impl From> for BlockError { + fn from(e: BlobError) -> Self { Self::BlobValidation(e) } } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 6dc1c8ed04..3d11d1c8e4 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -679,19 +679,15 @@ impl Worker { } Err(err) => { match err { - BlobError::BlobParentUnknown { - blob_root, - blob_parent_root, - } => { + BlobError::BlobParentUnknown(blob) => { debug!( self.log, "Unknown parent hash for blob"; "action" => "requesting parent", - "blob_root" => %blob_root, - "parent_root" => %blob_parent_root + "blob_root" => %blob.block_root, + "parent_root" => %blob.block_parent_root ); - // TODO: send blob to reprocessing queue and queue a sync request for the blob. - todo!(); + self.send_sync_message(SyncMessage::BlobParentUnknown(peer_id, blob)); } BlobError::ProposerSignatureInvalid | BlobError::UnknownValidator(_) @@ -754,6 +750,9 @@ impl Worker { // This value is not used presently, but it might come in handy for debugging. _seen_duration: Duration, ) { + let blob_root = verified_blob.block_root(); + let blob_slot = verified_blob.slot(); + let blob_clone = verified_blob.clone().to_blob(); match self .chain .process_blob(verified_blob, CountUnrealized::True) @@ -768,9 +767,25 @@ impl Worker { slot, peer_id, block_hash, )); } - Err(_err) => { - // handle errors - todo!() + Err(err) => { + debug!( + self.log, + "Invalid gossip blob"; + "outcome" => ?err, + "block root" => ?blob_root, + "block slot" => blob_slot, + "blob index" => blob_clone.index, + ); + self.gossip_penalize_peer( + peer_id, + PeerAction::MidToleranceError, + "bad_gossip_blob_ssz", + ); + trace!( + self.log, + "Invalid gossip blob ssz"; + "ssz" => format_args!("0x{}", hex::encode(blob_clone.as_ssz_bytes())), + ); } } } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index b7028b3d8b..3dee1c3336 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -26,6 +26,7 @@ use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; use crate::sync::block_lookups::single_block_lookup::LookupVerifyError; +mod hg5e3wdtrfqa; mod parent_lookup; mod single_block_lookup; #[cfg(test)] @@ -201,6 +202,23 @@ impl BlockLookups { ); } + pub fn search_current_unknown_blob_parent( + &mut self, + blob: Arc>, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + ) { + let block_root = blob.block_root; + self.search_block_with( + |request| { + let _ = request.add_blob(blob.clone()); + }, + block_root, + peer_id, + cx, + ); + } + /// If a block is attempted to be processed but we do not know its parent, this function is /// called in order to find the block's parent. pub fn search_parent( diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index caef901191..f87751029d 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -101,6 +101,36 @@ impl SingleBlockLookup>, + ) -> Result, LookupVerifyError> { + let block_root = blob.block_root; + + if let Some(blob_opt) = self.downloaded_blobs.get_mut(blob.index as usize) { + //TODO(sean) should we log a warn if there is already a downloaded blob? + *blob_opt = Some(blob.clone()); + + if let Some(block) = self.downloaded_block.as_ref() { + match self.da_checker.wrap_block( + block_root, + block.clone(), + self.downloaded_blobs.clone(), + ) { + Ok(wrapper) => Ok(LookupDownloadStatus::Process(wrapper)), + Err(AvailabilityCheckError::MissingBlobs) => { + Ok(LookupDownloadStatus::SearchBlock(block_root)) + } + Err(_e) => Err(LookupVerifyError::AvailabilityCheck), + } + } else { + Ok(LookupDownloadStatus::SearchBlock(block_root)) + } + } else { + return Err(LookupVerifyError::InvalidIndex(blob.index)); + } + } + pub fn add_blobs( &mut self, block_root: Hash256, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 8e77998cce..2b3183e1ef 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -121,6 +121,8 @@ pub enum SyncMessage { /// A block with an unknown parent has been received. UnknownBlock(PeerId, BlockWrapper, Hash256), + BlobParentUnknown(PeerId, Arc>), + /// A peer has sent an attestation that references a block that is unknown. This triggers the /// manager to attempt to find the block matching the unknown hash. UnknownBlockHashFromAttestation(PeerId, Hash256), @@ -631,24 +633,9 @@ impl SyncManager { seen_timestamp, } => self.rpc_blob_received(request_id, peer_id, blob_sidecar, seen_timestamp), SyncMessage::UnknownBlock(peer_id, block, block_root) => { - // If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore - if !self.network_globals.sync_state.read().is_synced() { - let head_slot = self.chain.canonical_head.cached_head().head_slot(); - let unknown_block_slot = block.slot(); + let block_slot = block.slot(); - // if the block is far in the future, ignore it. If its within the slot tolerance of - // our current head, regardless of the syncing state, fetch it. - if (head_slot >= unknown_block_slot - && head_slot.sub(unknown_block_slot).as_usize() > SLOT_IMPORT_TOLERANCE) - || (head_slot < unknown_block_slot - && unknown_block_slot.sub(head_slot).as_usize() > SLOT_IMPORT_TOLERANCE) - { - return; - } - } - if self.network_globals.peers.read().is_connected(&peer_id) - && self.network.is_execution_engine_online() - { + if self.synced_and_connected_within_tolerance(block_slot, &peer_id) { let parent_root = block.parent_root(); //TODO(sean) what about early blocks let slot = match self.chain.slot_clock.now() { @@ -662,8 +649,6 @@ impl SyncManager { } }; - let block_slot = block.slot(); - if block_slot == slot { if let Err(e) = self .delayed_lookups @@ -688,6 +673,47 @@ impl SyncManager { ); } } + SyncMessage::BlobParentUnknown(peer_id, blob) => { + let blob_slot = blob.slot; + + if self.synced_and_connected_within_tolerance(blob_slot, &peer_id) { + let block_root = blob.block_root; + let parent_root = blob.block_parent_root; + //TODO(sean) what about early blocks + let slot = match self.chain.slot_clock.now() { + Some(slot) => slot, + None => { + error!( + self.log, + "Could not read slot clock, dropping unknown blob parent message" + ); + return; + } + }; + + if blob_slot == slot { + if let Err(e) = self + .delayed_lookups + .try_send(SyncMessage::BlobParentUnknown(peer_id, blob)) + { + warn!(self.log, "Delayed lookups dropped for blob"; "block_root" => ?block_root); + } + } else { + self.block_lookups.search_current_unknown_blob_parent( + blob, + peer_id, + &mut self.network, + ); + } + self.block_lookups.search_parent( + blob_slot, + block_root, + parent_root, + peer_id, + &mut self.network, + ); + } + } SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => { // If we are not synced, ignore this block. if self.synced_and_connected(&peer_id) { @@ -778,6 +804,29 @@ impl SyncManager { } } + fn synced_and_connected_within_tolerance( + &mut self, + block_slot: Slot, + peer_id: &PeerId, + ) -> bool { + if !self.network_globals.sync_state.read().is_synced() { + let head_slot = self.chain.canonical_head.cached_head().head_slot(); + + // if the block is far in the future, ignore it. If its within the slot tolerance of + // our current head, regardless of the syncing state, fetch it. + if (head_slot >= block_slot + && head_slot.sub(block_slot).as_usize() > SLOT_IMPORT_TOLERANCE) + || (head_slot < block_slot + && block_slot.sub(head_slot).as_usize() > SLOT_IMPORT_TOLERANCE) + { + return false; + } + } + + self.network_globals.peers.read().is_connected(&peer_id) + && self.network.is_execution_engine_online() + } + fn synced_and_connected(&mut self, peer_id: &PeerId) -> bool { self.network_globals.sync_state.read().is_synced() && self.network_globals.peers.read().is_connected(&peer_id)