mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-20 14:28:37 +00:00
add tests and fix some things
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
|
||||
use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker};
|
||||
use beacon_chain::data_availability_checker::{DataAvailabilityChecker};
|
||||
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
|
||||
use lighthouse_network::rpc::RPCError;
|
||||
use lighthouse_network::{PeerAction, PeerId};
|
||||
@@ -15,7 +15,7 @@ use strum::Display;
|
||||
use types::blob_sidecar::FixedBlobSidecarList;
|
||||
use types::{BlobSidecar, SignedBeaconBlock, Slot};
|
||||
|
||||
use self::parent_lookup::{LookupDownloadStatus, PARENT_FAIL_TOLERANCE};
|
||||
use self::parent_lookup::PARENT_FAIL_TOLERANCE;
|
||||
use self::parent_lookup::{ParentLookup, ParentVerifyError};
|
||||
use self::single_block_lookup::{LookupVerifyError, SingleBlockLookup};
|
||||
use super::manager::BlockProcessingResult;
|
||||
@@ -26,6 +26,7 @@ use super::{
|
||||
};
|
||||
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
|
||||
use crate::metrics;
|
||||
use crate::sync::block_lookups::single_block_lookup::UnknownParentComponents;
|
||||
|
||||
mod parent_lookup;
|
||||
mod single_block_lookup;
|
||||
@@ -111,12 +112,6 @@ impl PeerShouldHave {
|
||||
PeerShouldHave::Neither(_) => false,
|
||||
}
|
||||
}
|
||||
fn should_have_blobs(&self) -> bool {
|
||||
match self {
|
||||
PeerShouldHave::BlockAndBlobs(_) => true,
|
||||
PeerShouldHave::Neither(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
@@ -146,19 +141,37 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
|
||||
pub fn search_block(
|
||||
&mut self,
|
||||
hash: Hash256,
|
||||
block_root: Hash256,
|
||||
peer_source: PeerShouldHave,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
self.search_block_with(|_| {}, hash, peer_source, cx)
|
||||
self.search_block_with(block_root, None, None, peer_source, cx)
|
||||
}
|
||||
|
||||
pub fn search_current_unknown_parent_block_and_blobs(
|
||||
&mut self,
|
||||
block_root: Hash256,
|
||||
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
blobs: Option<FixedBlobSidecarList<T::EthSpec>>,
|
||||
peer_id: PeerId,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
self.search_block_with(
|
||||
block_root,
|
||||
block,
|
||||
blobs,
|
||||
PeerShouldHave::Neither(peer_id),
|
||||
cx,
|
||||
);
|
||||
}
|
||||
|
||||
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
|
||||
/// constructed.
|
||||
pub fn search_block_with(
|
||||
&mut self,
|
||||
cache_fn: impl Fn(&mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>),
|
||||
hash: Hash256,
|
||||
block_root: Hash256,
|
||||
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
blobs: Option<FixedBlobSidecarList<T::EthSpec>>,
|
||||
peer_source: PeerShouldHave,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
@@ -167,14 +180,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
.single_block_lookups
|
||||
.iter_mut()
|
||||
.any(|(_, _, single_block_request)| {
|
||||
single_block_request.add_peer_if_useful(&hash, peer_source)
|
||||
single_block_request.add_peer_if_useful(&block_root, peer_source)
|
||||
})
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if self.parent_lookups.iter_mut().any(|parent_req| {
|
||||
parent_req.add_peer_if_useful(&hash, peer_source) || parent_req.contains_block(&hash)
|
||||
parent_req.add_peer_if_useful(&block_root, peer_source)
|
||||
|| parent_req.contains_block(&block_root)
|
||||
}) {
|
||||
// If the block was already downloaded, or is being downloaded in this moment, do not
|
||||
// request it.
|
||||
@@ -184,7 +198,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
if self
|
||||
.processing_parent_lookups
|
||||
.values()
|
||||
.any(|(hashes, _last_parent_request)| hashes.contains(&hash))
|
||||
.any(|(hashes, _last_parent_request)| hashes.contains(&block_root))
|
||||
{
|
||||
// we are already processing this block, ignore it.
|
||||
return;
|
||||
@@ -193,13 +207,26 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
debug!(
|
||||
self.log,
|
||||
"Searching for block";
|
||||
"peer_id" => %peer_source,
|
||||
"block" => %hash
|
||||
"peer_id" => ?peer_source,
|
||||
"block" => ?block_root
|
||||
);
|
||||
|
||||
let mut single_block_request =
|
||||
SingleBlockLookup::new(hash, peer_source, self.da_checker.clone());
|
||||
cache_fn(&mut single_block_request);
|
||||
let triggered_by_unknown_parent = block.is_some() || blobs.is_some();
|
||||
let unknown_parent_components = if triggered_by_unknown_parent {
|
||||
Some(UnknownParentComponents {
|
||||
downloaded_block: block,
|
||||
downloaded_blobs: blobs.unwrap_or_default(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut single_block_request = SingleBlockLookup::new(
|
||||
block_root,
|
||||
unknown_parent_components,
|
||||
peer_source,
|
||||
self.da_checker.clone(),
|
||||
);
|
||||
|
||||
let block_request_id =
|
||||
if let Ok(Some((peer_id, block_request))) = single_block_request.request_block() {
|
||||
@@ -224,40 +251,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
);
|
||||
}
|
||||
|
||||
pub fn search_current_unknown_parent(
|
||||
&mut self,
|
||||
block_root: Hash256,
|
||||
block: BlockWrapper<T::EthSpec>,
|
||||
peer_id: PeerId,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
self.search_block_with(
|
||||
|request| {
|
||||
let _ = request.add_block_wrapper(block_root, block.clone());
|
||||
},
|
||||
block_root,
|
||||
PeerShouldHave::Neither(peer_id),
|
||||
cx,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn search_current_unknown_blob_parent(
|
||||
&mut self,
|
||||
blob: Arc<BlobSidecar<T::EthSpec>>,
|
||||
peer_id: PeerId,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
let block_root = blob.block_root;
|
||||
self.search_block_with(
|
||||
|request| {
|
||||
let _ = request.add_blob(blob.clone());
|
||||
},
|
||||
block_root,
|
||||
PeerShouldHave::Neither(peer_id),
|
||||
cx,
|
||||
);
|
||||
}
|
||||
|
||||
/// If a block is attempted to be processed but we do not know its parent, this function is
|
||||
/// called in order to find the block's parent.
|
||||
pub fn search_parent(
|
||||
@@ -268,7 +261,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
peer_id: PeerId,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
// Gossip blocks or blobs shouldn't be propogated if parents are unavailable.
|
||||
// Gossip blocks or blobs shouldn't be propagated if parents are unavailable.
|
||||
let peer_source = PeerShouldHave::BlockAndBlobs(peer_id);
|
||||
|
||||
// If this block or it's parent is part of a known failed chain, ignore it.
|
||||
@@ -319,33 +312,24 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
let stream_terminator = block.is_none().into();
|
||||
let log = self.log.clone();
|
||||
|
||||
let Some((triggered_parent_request, request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Block) else {
|
||||
let Some((pending_parent_request, request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Block) else {
|
||||
return;
|
||||
};
|
||||
|
||||
let should_remove = match request_ref.verify_block(block) {
|
||||
Ok(Some((root, block))) => {
|
||||
if triggered_parent_request {
|
||||
if let LookupDownloadStatus::AvailabilityCheck(e) =
|
||||
request_ref.add_block(root, block)
|
||||
{
|
||||
handle_availability_check_error(
|
||||
request_id_ref,
|
||||
request_ref,
|
||||
ResponseType::Blob,
|
||||
peer_id,
|
||||
e,
|
||||
cx,
|
||||
&log,
|
||||
)
|
||||
} else {
|
||||
ShouldRemoveLookup::False
|
||||
}
|
||||
} else {
|
||||
Ok(Some((block_root, block))) => {
|
||||
if let Some(parent_components) = request_ref.unknown_parent_components.as_mut() {
|
||||
parent_components.add_unknown_parent_block(block.clone());
|
||||
};
|
||||
|
||||
if !pending_parent_request {
|
||||
let block_wrapper = request_ref
|
||||
.get_downloaded_block()
|
||||
.unwrap_or(BlockWrapper::Block(block));
|
||||
// This is the correct block, send it for processing
|
||||
match self.send_block_for_processing(
|
||||
root,
|
||||
BlockWrapper::Block(block),
|
||||
block_root,
|
||||
block_wrapper,
|
||||
seen_timestamp,
|
||||
BlockProcessType::SingleBlock { id },
|
||||
cx,
|
||||
@@ -353,6 +337,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
Ok(()) => ShouldRemoveLookup::False,
|
||||
Err(()) => ShouldRemoveLookup::True,
|
||||
}
|
||||
} else {
|
||||
ShouldRemoveLookup::False
|
||||
}
|
||||
}
|
||||
Ok(None) => ShouldRemoveLookup::False,
|
||||
@@ -390,26 +376,32 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
|
||||
let log = self.log.clone();
|
||||
|
||||
let Some((triggered_parent_request, request_id_ref, request_ref)) =
|
||||
let Some((pending_parent_requests, request_id_ref, request_ref)) =
|
||||
self.find_single_lookup_request(id, stream_terminator, ResponseType::Blob) else {
|
||||
return;
|
||||
};
|
||||
|
||||
let should_remove = match request_ref.verify_blob(blob) {
|
||||
Ok(Some((block_root, blobs))) => {
|
||||
if triggered_parent_request {
|
||||
if let LookupDownloadStatus::AvailabilityCheck(e) =
|
||||
request_ref.add_blobs(block_root, blobs)
|
||||
{
|
||||
handle_availability_check_error(
|
||||
request_id_ref,
|
||||
request_ref,
|
||||
ResponseType::Blob,
|
||||
peer_id,
|
||||
e,
|
||||
cx,
|
||||
&log,
|
||||
)
|
||||
if let Some(parent_components) = request_ref.unknown_parent_components.as_mut() {
|
||||
parent_components.add_unknown_parent_blobs(blobs);
|
||||
|
||||
if !pending_parent_requests {
|
||||
request_ref
|
||||
.get_downloaded_block()
|
||||
.map(|block| {
|
||||
match self.send_block_for_processing(
|
||||
block_root,
|
||||
block,
|
||||
seen_timestamp,
|
||||
BlockProcessType::SingleBlock { id },
|
||||
cx,
|
||||
) {
|
||||
Ok(()) => ShouldRemoveLookup::False,
|
||||
Err(()) => ShouldRemoveLookup::True,
|
||||
}
|
||||
})
|
||||
.unwrap_or(ShouldRemoveLookup::False)
|
||||
} else {
|
||||
ShouldRemoveLookup::False
|
||||
}
|
||||
@@ -525,40 +517,41 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
|
||||
match parent_lookup.verify_block(block, &mut self.failed_chains) {
|
||||
Ok(Some((block_root, block))) => {
|
||||
let process_or_search = parent_lookup.add_block(block_root, block);
|
||||
match process_or_search {
|
||||
LookupDownloadStatus::Process(wrapper) => {
|
||||
let chain_hash = parent_lookup.chain_hash();
|
||||
if self
|
||||
.send_block_for_processing(
|
||||
block_root,
|
||||
wrapper,
|
||||
seen_timestamp,
|
||||
BlockProcessType::ParentLookup { chain_hash },
|
||||
cx,
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
self.parent_lookups.push(parent_lookup)
|
||||
}
|
||||
parent_lookup.add_current_request_block(block);
|
||||
if let Some(block_wrapper) =
|
||||
parent_lookup.current_parent_request.get_downloaded_block()
|
||||
{
|
||||
let chain_hash = parent_lookup.chain_hash();
|
||||
if self
|
||||
.send_block_for_processing(
|
||||
block_root,
|
||||
block_wrapper,
|
||||
seen_timestamp,
|
||||
BlockProcessType::ParentLookup { chain_hash },
|
||||
cx,
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
self.parent_lookups.push(parent_lookup)
|
||||
}
|
||||
LookupDownloadStatus::SearchBlock(block_root) => {
|
||||
if let Some(peer_source) =
|
||||
parent_lookup.peer_source(ResponseType::Block, peer_id)
|
||||
{
|
||||
self.search_block(block_root, peer_source, cx);
|
||||
self.parent_lookups.push(parent_lookup)
|
||||
} else {
|
||||
warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root);
|
||||
} else {
|
||||
let outstanding_blobs_req =
|
||||
parent_lookup.current_parent_blob_request_id.is_some();
|
||||
if !outstanding_blobs_req {
|
||||
if let Ok(peer_id) = parent_lookup
|
||||
.current_parent_request
|
||||
.downloading_peer(ResponseType::Blob) {
|
||||
cx.report_peer(
|
||||
peer_id.to_peer_id(),
|
||||
PeerAction::MidToleranceError,
|
||||
"bbroot_failed_chains",
|
||||
);
|
||||
}
|
||||
|
||||
self.request_parent_blob(parent_lookup, cx);
|
||||
} else {
|
||||
self.parent_lookups.push(parent_lookup)
|
||||
}
|
||||
LookupDownloadStatus::AvailabilityCheck(e) => self.handle_invalid_block(
|
||||
BlockError::AvailabilityCheck(e),
|
||||
peer_id,
|
||||
cx,
|
||||
ResponseType::Block,
|
||||
parent_lookup,
|
||||
),
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
@@ -629,11 +622,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
seen_timestamp: Duration,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
let mut parent_lookup = if let Some(pos) = self
|
||||
.parent_lookups
|
||||
.iter()
|
||||
.position(|request| request.pending_blob_response(id))
|
||||
{
|
||||
let mut parent_lookup = if let Some(pos) = self.parent_lookups.iter().position(|request| {
|
||||
request.pending_blob_response(id)
|
||||
}) {
|
||||
self.parent_lookups.remove(pos)
|
||||
} else {
|
||||
if blob.is_some() {
|
||||
@@ -644,40 +635,25 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
|
||||
match parent_lookup.verify_blob(blob, &mut self.failed_chains) {
|
||||
Ok(Some((block_root, blobs))) => {
|
||||
let processed_or_search = parent_lookup.add_blobs(block_root, blobs);
|
||||
match processed_or_search {
|
||||
LookupDownloadStatus::Process(wrapper) => {
|
||||
let chain_hash = parent_lookup.chain_hash();
|
||||
if self
|
||||
.send_block_for_processing(
|
||||
block_root,
|
||||
wrapper,
|
||||
seen_timestamp,
|
||||
BlockProcessType::ParentLookup { chain_hash },
|
||||
cx,
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
self.parent_lookups.push(parent_lookup)
|
||||
}
|
||||
parent_lookup.add_current_request_blobs(blobs);
|
||||
let chain_hash = parent_lookup.chain_hash();
|
||||
if let Some(block_wrapper) =
|
||||
parent_lookup.current_parent_request.get_downloaded_block()
|
||||
{
|
||||
if self
|
||||
.send_block_for_processing(
|
||||
block_root,
|
||||
block_wrapper,
|
||||
seen_timestamp,
|
||||
BlockProcessType::ParentLookup { chain_hash },
|
||||
cx,
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
self.parent_lookups.push(parent_lookup)
|
||||
}
|
||||
LookupDownloadStatus::SearchBlock(block_root) => {
|
||||
if let Some(peer_source) =
|
||||
parent_lookup.peer_source(ResponseType::Block, peer_id)
|
||||
{
|
||||
self.search_block(block_root, peer_source, cx);
|
||||
self.parent_lookups.push(parent_lookup)
|
||||
} else {
|
||||
warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root);
|
||||
}
|
||||
}
|
||||
LookupDownloadStatus::AvailabilityCheck(e) => self.handle_invalid_block(
|
||||
BlockError::AvailabilityCheck(e),
|
||||
peer_id,
|
||||
cx,
|
||||
ResponseType::Blob,
|
||||
parent_lookup,
|
||||
),
|
||||
} else {
|
||||
self.parent_lookups.push(parent_lookup)
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
@@ -865,7 +841,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
|
||||
/* Processing responses */
|
||||
|
||||
pub fn single_block_processed(
|
||||
pub fn single_block_component_processed(
|
||||
&mut self,
|
||||
target_id: Id,
|
||||
result: BlockProcessingResult<T::EthSpec>,
|
||||
@@ -874,19 +850,21 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
) {
|
||||
let lookup_components_opt = self.single_block_lookups.iter_mut().enumerate().find_map(
|
||||
|(index, (block_id_opt, blob_id_opt, req))| {
|
||||
let id_filter = |id: &&mut Id| -> bool { target_id == **id };
|
||||
|
||||
let block_id = block_id_opt.as_mut().filter(id_filter);
|
||||
let blob_id = blob_id_opt.as_mut().filter(id_filter);
|
||||
block_id.or(blob_id).map(|id_ref| (index, id_ref, req))
|
||||
let id_filter = |id: &Id| -> bool { target_id == *id };
|
||||
let block_matches = block_id_opt.as_ref().map(id_filter).unwrap_or(false);
|
||||
let blob_matches = blob_id_opt.as_ref().map(id_filter).unwrap_or(false);
|
||||
if !block_matches && !blob_matches {
|
||||
return None;
|
||||
}
|
||||
Some((index, block_id_opt, blob_id_opt, req))
|
||||
},
|
||||
);
|
||||
let (index, request_id_ref, request_ref) = match lookup_components_opt {
|
||||
let (index, block_id_ref, blob_id_ref, request_ref) = match lookup_components_opt {
|
||||
Some(req) => req,
|
||||
None => {
|
||||
return debug!(
|
||||
self.log,
|
||||
"Block processed for single block lookup not present"
|
||||
"Block component processed for single block lookup not present"
|
||||
);
|
||||
}
|
||||
};
|
||||
@@ -906,8 +884,115 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
ShouldRemoveLookup::True
|
||||
}
|
||||
AvailabilityProcessingStatus::MissingComponents(_, block_root) => {
|
||||
self.search_block(block_root, peer_id, cx);
|
||||
ShouldRemoveLookup::False
|
||||
// if peer should have both, and missing components is received after we've
|
||||
// processed the opposite, then we can downscore.
|
||||
let should_remove = match response_type {
|
||||
ResponseType::Block => {
|
||||
if request_ref.blob_request_state.component_processed {
|
||||
match request_ref.processing_peer(ResponseType::Blob) {
|
||||
Ok(PeerShouldHave::BlockAndBlobs(other_peer)) => {
|
||||
cx.report_peer(
|
||||
peer_id.to_peer_id(),
|
||||
PeerAction::MidToleranceError,
|
||||
"single_block_failure",
|
||||
);
|
||||
if let Some(blob_id_ref) = blob_id_ref {
|
||||
// Try it again if possible.
|
||||
retry_request_after_failure(
|
||||
blob_id_ref,
|
||||
request_ref,
|
||||
ResponseType::Blob,
|
||||
peer_id.as_peer_id(),
|
||||
cx,
|
||||
&self.log,
|
||||
)
|
||||
} else {
|
||||
ShouldRemoveLookup::False
|
||||
}
|
||||
}
|
||||
Ok(PeerShouldHave::Neither(other_peer)) => {
|
||||
request_ref
|
||||
.blob_request_state
|
||||
.remove_peer_if_useless(&other_peer);
|
||||
if let Some(blob_id_ref) = blob_id_ref {
|
||||
// Try it again if possible.
|
||||
retry_request_after_failure(
|
||||
blob_id_ref,
|
||||
request_ref,
|
||||
ResponseType::Blob,
|
||||
peer_id.as_peer_id(),
|
||||
cx,
|
||||
&self.log,
|
||||
)
|
||||
} else {
|
||||
ShouldRemoveLookup::False
|
||||
}
|
||||
}
|
||||
Err(()) => {
|
||||
//TODO(sean) retry?
|
||||
ShouldRemoveLookup::False
|
||||
}
|
||||
}
|
||||
} else {
|
||||
request_ref.block_request_state.component_processed = true;
|
||||
ShouldRemoveLookup::False
|
||||
}
|
||||
}
|
||||
ResponseType::Blob => {
|
||||
if request_ref.block_request_state.component_processed {
|
||||
match request_ref.processing_peer(ResponseType::Blob) {
|
||||
Ok(PeerShouldHave::BlockAndBlobs(other_peer)) => {
|
||||
cx.report_peer(
|
||||
other_peer,
|
||||
PeerAction::MidToleranceError,
|
||||
"single_block_failure",
|
||||
);
|
||||
if let Some(blob_id_ref) = blob_id_ref {
|
||||
// Try it again if possible.
|
||||
retry_request_after_failure(
|
||||
blob_id_ref,
|
||||
request_ref,
|
||||
ResponseType::Blob,
|
||||
peer_id.as_peer_id(),
|
||||
cx,
|
||||
&self.log,
|
||||
)
|
||||
} else {
|
||||
ShouldRemoveLookup::False
|
||||
}
|
||||
}
|
||||
Ok(PeerShouldHave::Neither(other_peer)) => {
|
||||
request_ref
|
||||
.blob_request_state
|
||||
.remove_peer_if_useless(&other_peer);
|
||||
if let Some(blob_id_ref) = blob_id_ref {
|
||||
// Try it again if possible.
|
||||
retry_request_after_failure(
|
||||
blob_id_ref,
|
||||
request_ref,
|
||||
ResponseType::Blob,
|
||||
peer_id.as_peer_id(),
|
||||
cx,
|
||||
&self.log,
|
||||
)
|
||||
} else {
|
||||
ShouldRemoveLookup::False
|
||||
}
|
||||
}
|
||||
Err(()) => {
|
||||
//TODO(sean) retry?
|
||||
ShouldRemoveLookup::False
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// retry block here?
|
||||
|
||||
request_ref.blob_request_state.component_processed = true;
|
||||
ShouldRemoveLookup::False
|
||||
}
|
||||
}
|
||||
};
|
||||
should_remove
|
||||
}
|
||||
},
|
||||
BlockProcessingResult::Ignored => {
|
||||
@@ -960,15 +1045,29 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
PeerAction::MidToleranceError,
|
||||
"single_block_failure",
|
||||
);
|
||||
// Try it again if possible.
|
||||
retry_request_after_failure(
|
||||
request_id_ref,
|
||||
request_ref,
|
||||
response_type,
|
||||
peer_id.as_peer_id(),
|
||||
cx,
|
||||
&self.log,
|
||||
)
|
||||
if let Some(blob_id_ref) = blob_id_ref {
|
||||
// Try it again if possible.
|
||||
retry_request_after_failure(
|
||||
blob_id_ref,
|
||||
request_ref,
|
||||
ResponseType::Blob,
|
||||
peer_id.as_peer_id(),
|
||||
cx,
|
||||
&self.log,
|
||||
)
|
||||
} else if let Some(block_id_ref) = block_id_ref {
|
||||
// Try it again if possible.
|
||||
retry_request_after_failure(
|
||||
block_id_ref,
|
||||
request_ref,
|
||||
ResponseType::Block,
|
||||
peer_id.as_peer_id(),
|
||||
cx,
|
||||
&self.log,
|
||||
)
|
||||
} else {
|
||||
ShouldRemoveLookup::True
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1041,7 +1140,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
self.search_block(block_root, peer_id, cx);
|
||||
}
|
||||
BlockProcessingResult::Err(BlockError::ParentUnknown(block)) => {
|
||||
parent_lookup.add_block_wrapper(block);
|
||||
parent_lookup.add_unknown_parent_block(block);
|
||||
self.request_parent_block_and_blobs(parent_lookup, cx);
|
||||
}
|
||||
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_))
|
||||
@@ -1057,10 +1156,22 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
);
|
||||
}
|
||||
};
|
||||
let (chain_hash, blocks, hashes, block_request) =
|
||||
let (chain_hash, mut blocks, hashes, block_request) =
|
||||
parent_lookup.parts_for_processing();
|
||||
if let Some(child_block) =
|
||||
self.single_block_lookups
|
||||
.iter_mut()
|
||||
.find_map(|(_, _, req)| {
|
||||
if req.requested_block_root == chain_hash {
|
||||
req.get_downloaded_block()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
{
|
||||
blocks.push(child_block);
|
||||
};
|
||||
let process_id = ChainSegmentProcessId::ParentLookup(chain_hash);
|
||||
|
||||
let work = WorkEvent::chain_segment(process_id, blocks);
|
||||
|
||||
match beacon_processor_send.try_send(work) {
|
||||
@@ -1090,13 +1201,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
);
|
||||
}
|
||||
BlockProcessingResult::Err(outcome) => {
|
||||
self.handle_invalid_block(
|
||||
outcome,
|
||||
peer_id.to_peer_id(),
|
||||
cx,
|
||||
response_type,
|
||||
parent_lookup,
|
||||
);
|
||||
self.handle_invalid_block(outcome, peer_id.to_peer_id(), cx, parent_lookup);
|
||||
}
|
||||
BlockProcessingResult::Ignored => {
|
||||
// Beacon processor signalled to ignore the block processing result.
|
||||
@@ -1120,7 +1225,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
outcome: BlockError<<T as BeaconChainTypes>::EthSpec>,
|
||||
peer_id: PeerId,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
response_type: ResponseType,
|
||||
mut parent_lookup: ParentLookup<T>,
|
||||
) {
|
||||
// all else we consider the chain a failure and downvote the peer that sent
|
||||
@@ -1135,16 +1239,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
// ambiguity.
|
||||
cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err");
|
||||
// Try again if possible
|
||||
match response_type {
|
||||
ResponseType::Block => {
|
||||
parent_lookup.block_processing_failed();
|
||||
self.request_parent_block(parent_lookup, cx);
|
||||
}
|
||||
ResponseType::Blob => {
|
||||
parent_lookup.blob_processing_failed();
|
||||
self.request_parent_blob(parent_lookup, cx);
|
||||
}
|
||||
}
|
||||
//TODO(sean) I don't see why we want to retry here, because this block is invalid and has
|
||||
// passed block root validation, so querying again by block root will yield the same result.
|
||||
parent_lookup.block_processing_failed();
|
||||
parent_lookup.blob_processing_failed();
|
||||
self.request_parent_block_and_blobs(parent_lookup, cx);
|
||||
}
|
||||
|
||||
pub fn parent_chain_processed(
|
||||
@@ -1262,6 +1361,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
process_type: BlockProcessType,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<(), ()> {
|
||||
let blob_count = blobs.iter().filter(|b| b.is_some()).count();
|
||||
if blob_count == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
match cx.processor_channel_if_enabled() {
|
||||
Some(beacon_processor_send) => {
|
||||
trace!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process_type" => ?process_type);
|
||||
@@ -1385,35 +1488,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_availability_check_error<T: BeaconChainTypes>(
|
||||
request_id_ref: &mut u32,
|
||||
request_ref: &mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,
|
||||
response_type: ResponseType,
|
||||
peer_id: PeerId,
|
||||
e: AvailabilityCheckError,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
log: &Logger,
|
||||
) -> ShouldRemoveLookup {
|
||||
warn!(log, "Peer sent message in single block lookup causing failed availability check";
|
||||
"root" => ?request_ref.requested_block_root,
|
||||
"error" => ?e,
|
||||
"peer_id" => %peer_id,
|
||||
"response_type" => ?response_type
|
||||
);
|
||||
cx.report_peer(
|
||||
peer_id,
|
||||
PeerAction::MidToleranceError,
|
||||
"single_block_failure",
|
||||
);
|
||||
retry_request_after_failure(
|
||||
request_id_ref,
|
||||
request_ref,
|
||||
ResponseType::Blob,
|
||||
&peer_id,
|
||||
cx,
|
||||
log,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
fn handle_block_lookup_verify_error<T: BeaconChainTypes>(
|
||||
request_id_ref: &mut u32,
|
||||
|
||||
Reference in New Issue
Block a user