parent blob lookups

This commit is contained in:
realbigsean
2023-04-20 19:42:33 -04:00
parent 0cc1704edf
commit bacec52017
7 changed files with 158 additions and 52 deletions

View File

@@ -679,19 +679,15 @@ impl<T: BeaconChainTypes> Worker<T> {
}
Err(err) => {
match err {
BlobError::BlobParentUnknown {
blob_root,
blob_parent_root,
} => {
BlobError::BlobParentUnknown(blob) => {
debug!(
self.log,
"Unknown parent hash for blob";
"action" => "requesting parent",
"blob_root" => %blob_root,
"parent_root" => %blob_parent_root
"blob_root" => %blob.block_root,
"parent_root" => %blob.block_parent_root
);
// TODO: send blob to reprocessing queue and queue a sync request for the blob.
todo!();
self.send_sync_message(SyncMessage::BlobParentUnknown(peer_id, blob));
}
BlobError::ProposerSignatureInvalid
| BlobError::UnknownValidator(_)
@@ -754,6 +750,9 @@ impl<T: BeaconChainTypes> Worker<T> {
// This value is not used presently, but it might come in handy for debugging.
_seen_duration: Duration,
) {
let blob_root = verified_blob.block_root();
let blob_slot = verified_blob.slot();
let blob_clone = verified_blob.clone().to_blob();
match self
.chain
.process_blob(verified_blob, CountUnrealized::True)
@@ -768,9 +767,25 @@ impl<T: BeaconChainTypes> Worker<T> {
slot, peer_id, block_hash,
));
}
Err(_err) => {
// handle errors
todo!()
Err(err) => {
debug!(
self.log,
"Invalid gossip blob";
"outcome" => ?err,
"block root" => ?blob_root,
"block slot" => blob_slot,
"blob index" => blob_clone.index,
);
self.gossip_penalize_peer(
peer_id,
PeerAction::MidToleranceError,
"bad_gossip_blob_ssz",
);
trace!(
self.log,
"Invalid gossip blob ssz";
"ssz" => format_args!("0x{}", hex::encode(blob_clone.as_ssz_bytes())),
);
}
}
}

View File

@@ -26,6 +26,7 @@ use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
use crate::metrics;
use crate::sync::block_lookups::single_block_lookup::LookupVerifyError;
mod hg5e3wdtrfqa;
mod parent_lookup;
mod single_block_lookup;
#[cfg(test)]
@@ -201,6 +202,23 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
}
pub fn search_current_unknown_blob_parent(
&mut self,
blob: Arc<BlobSidecar<T::EthSpec>>,
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
) {
let block_root = blob.block_root;
self.search_block_with(
|request| {
let _ = request.add_blob(blob.clone());
},
block_root,
peer_id,
cx,
);
}
/// If a block is attempted to be processed but we do not know its parent, this function is
/// called in order to find the block's parent.
pub fn search_parent(

View File

@@ -101,6 +101,36 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
None
}
pub fn add_blob(
&mut self,
blob: Arc<BlobSidecar<T::EthSpec>>,
) -> Result<LookupDownloadStatus<T::EthSpec>, LookupVerifyError> {
let block_root = blob.block_root;
if let Some(blob_opt) = self.downloaded_blobs.get_mut(blob.index as usize) {
//TODO(sean) should we log a warn if there is already a downloaded blob?
*blob_opt = Some(blob.clone());
if let Some(block) = self.downloaded_block.as_ref() {
match self.da_checker.wrap_block(
block_root,
block.clone(),
self.downloaded_blobs.clone(),
) {
Ok(wrapper) => Ok(LookupDownloadStatus::Process(wrapper)),
Err(AvailabilityCheckError::MissingBlobs) => {
Ok(LookupDownloadStatus::SearchBlock(block_root))
}
Err(_e) => Err(LookupVerifyError::AvailabilityCheck),
}
} else {
Ok(LookupDownloadStatus::SearchBlock(block_root))
}
} else {
return Err(LookupVerifyError::InvalidIndex(blob.index));
}
}
pub fn add_blobs(
&mut self,
block_root: Hash256,

View File

@@ -121,6 +121,8 @@ pub enum SyncMessage<T: EthSpec> {
/// A block with an unknown parent has been received.
UnknownBlock(PeerId, BlockWrapper<T>, Hash256),
BlobParentUnknown(PeerId, Arc<BlobSidecar<T>>),
/// A peer has sent an attestation that references a block that is unknown. This triggers the
/// manager to attempt to find the block matching the unknown hash.
UnknownBlockHashFromAttestation(PeerId, Hash256),
@@ -631,24 +633,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp,
} => self.rpc_blob_received(request_id, peer_id, blob_sidecar, seen_timestamp),
SyncMessage::UnknownBlock(peer_id, block, block_root) => {
// If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore
if !self.network_globals.sync_state.read().is_synced() {
let head_slot = self.chain.canonical_head.cached_head().head_slot();
let unknown_block_slot = block.slot();
let block_slot = block.slot();
// if the block is far in the future, ignore it. If its within the slot tolerance of
// our current head, regardless of the syncing state, fetch it.
if (head_slot >= unknown_block_slot
&& head_slot.sub(unknown_block_slot).as_usize() > SLOT_IMPORT_TOLERANCE)
|| (head_slot < unknown_block_slot
&& unknown_block_slot.sub(head_slot).as_usize() > SLOT_IMPORT_TOLERANCE)
{
return;
}
}
if self.network_globals.peers.read().is_connected(&peer_id)
&& self.network.is_execution_engine_online()
{
if self.synced_and_connected_within_tolerance(block_slot, &peer_id) {
let parent_root = block.parent_root();
//TODO(sean) what about early blocks
let slot = match self.chain.slot_clock.now() {
@@ -662,8 +649,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
};
let block_slot = block.slot();
if block_slot == slot {
if let Err(e) = self
.delayed_lookups
@@ -688,6 +673,47 @@ impl<T: BeaconChainTypes> SyncManager<T> {
);
}
}
SyncMessage::BlobParentUnknown(peer_id, blob) => {
let blob_slot = blob.slot;
if self.synced_and_connected_within_tolerance(blob_slot, &peer_id) {
let block_root = blob.block_root;
let parent_root = blob.block_parent_root;
//TODO(sean) what about early blocks
let slot = match self.chain.slot_clock.now() {
Some(slot) => slot,
None => {
error!(
self.log,
"Could not read slot clock, dropping unknown blob parent message"
);
return;
}
};
if blob_slot == slot {
if let Err(e) = self
.delayed_lookups
.try_send(SyncMessage::BlobParentUnknown(peer_id, blob))
{
warn!(self.log, "Delayed lookups dropped for blob"; "block_root" => ?block_root);
}
} else {
self.block_lookups.search_current_unknown_blob_parent(
blob,
peer_id,
&mut self.network,
);
}
self.block_lookups.search_parent(
blob_slot,
block_root,
parent_root,
peer_id,
&mut self.network,
);
}
}
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => {
// If we are not synced, ignore this block.
if self.synced_and_connected(&peer_id) {
@@ -778,6 +804,29 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
fn synced_and_connected_within_tolerance(
&mut self,
block_slot: Slot,
peer_id: &PeerId,
) -> bool {
if !self.network_globals.sync_state.read().is_synced() {
let head_slot = self.chain.canonical_head.cached_head().head_slot();
// if the block is far in the future, ignore it. If its within the slot tolerance of
// our current head, regardless of the syncing state, fetch it.
if (head_slot >= block_slot
&& head_slot.sub(block_slot).as_usize() > SLOT_IMPORT_TOLERANCE)
|| (head_slot < block_slot
&& block_slot.sub(head_slot).as_usize() > SLOT_IMPORT_TOLERANCE)
{
return false;
}
}
self.network_globals.peers.read().is_connected(&peer_id)
&& self.network.is_execution_engine_online()
}
fn synced_and_connected(&mut self, peer_id: &PeerId) -> bool {
self.network_globals.sync_state.read().is_synced()
&& self.network_globals.peers.read().is_connected(&peer_id)