mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-11 18:04:18 +00:00
Check da_checker before doing a block lookup request (#5681)
* Check da_checker before doing a block lookup request * Ensure consistent handling of lookup result * use req resp pre import cache rather than da checker
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
use self::parent_chain::{compute_parent_chains, NodeChain};
|
||||
pub use self::single_block_lookup::DownloadResult;
|
||||
use self::single_block_lookup::{LookupRequestError, SingleBlockLookup};
|
||||
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
|
||||
use super::manager::{BlockProcessType, BlockProcessingResult};
|
||||
use super::network_context::{RpcProcessingResult, SyncNetworkContext};
|
||||
use crate::metrics;
|
||||
@@ -17,6 +17,7 @@ use lighthouse_network::{PeerAction, PeerId};
|
||||
use lru_cache::LRUTimeCache;
|
||||
pub use single_block_lookup::{BlobRequestState, BlockRequestState};
|
||||
use slog::{debug, error, trace, warn, Logger};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use store::Hash256;
|
||||
@@ -266,6 +267,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
"peer_ids" => ?peers,
|
||||
"block" => ?block_root,
|
||||
);
|
||||
metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED);
|
||||
|
||||
// If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve),
|
||||
// signal here to hold processing downloaded data.
|
||||
@@ -276,14 +278,20 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
lookup.add_child_components(block_component);
|
||||
}
|
||||
|
||||
if let Err(e) = lookup.continue_requests(cx) {
|
||||
self.on_lookup_request_error(lookup.id, e, "new_current_lookup");
|
||||
false
|
||||
} else {
|
||||
self.single_block_lookups.insert(lookup.id, lookup);
|
||||
self.update_metrics();
|
||||
true
|
||||
}
|
||||
let id = lookup.id;
|
||||
let lookup = match self.single_block_lookups.entry(id) {
|
||||
Entry::Vacant(entry) => entry.insert(lookup),
|
||||
Entry::Occupied(_) => {
|
||||
// Should never happen
|
||||
warn!(self.log, "Lookup exists with same id"; "id" => id);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let result = lookup.continue_requests(cx);
|
||||
self.on_lookup_result(id, result, "new_current_lookup", cx);
|
||||
self.update_metrics();
|
||||
true
|
||||
}
|
||||
|
||||
/* Lookup responses */
|
||||
@@ -296,9 +304,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
response: RpcProcessingResult<R::VerifiedResponseType>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
if let Err(e) = self.on_download_response_inner::<R>(id, peer_id, response, cx) {
|
||||
self.on_lookup_request_error(id, e, "download_response");
|
||||
}
|
||||
let result = self.on_download_response_inner::<R>(id, peer_id, response, cx);
|
||||
self.on_lookup_result(id, result, "download_response", cx);
|
||||
}
|
||||
|
||||
/// Process a block or blob response received from a single lookup request.
|
||||
@@ -308,7 +315,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
peer_id: PeerId,
|
||||
response: RpcProcessingResult<R::VerifiedResponseType>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
// Downscore peer even if lookup is not known
|
||||
// Only downscore lookup verify errors. RPC errors are downscored in the network handler.
|
||||
if let Err(LookupFailure::LookupVerifyError(e)) = &response {
|
||||
@@ -321,7 +328,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
// We don't have the ability to cancel in-flight RPC requests. So this can happen
|
||||
// if we started this RPC request, and later saw the block/blobs via gossip.
|
||||
debug!(self.log, "Block returned for single block lookup not present"; "id" => id);
|
||||
return Ok(());
|
||||
return Err(LookupRequestError::UnknownLookup);
|
||||
};
|
||||
|
||||
let block_root = lookup.block_root();
|
||||
@@ -346,7 +353,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
peer_id,
|
||||
})?;
|
||||
// continue_request will send for processing as the request state is AwaitingProcessing
|
||||
lookup.continue_request::<R>(cx)
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(self.log,
|
||||
@@ -359,9 +365,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
|
||||
request_state.on_download_failure()?;
|
||||
// continue_request will retry a download as the request state is AwaitingDownload
|
||||
lookup.continue_request::<R>(cx)
|
||||
}
|
||||
}
|
||||
|
||||
lookup.continue_requests(cx)
|
||||
}
|
||||
|
||||
/* Error responses */
|
||||
@@ -388,16 +395,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
result: BlockProcessingResult<T::EthSpec>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
if let Err(e) = match process_type {
|
||||
let lookup_result = match process_type {
|
||||
BlockProcessType::SingleBlock { id } => {
|
||||
self.on_processing_result_inner::<BlockRequestState<T::EthSpec>>(id, result, cx)
|
||||
}
|
||||
BlockProcessType::SingleBlob { id } => {
|
||||
self.on_processing_result_inner::<BlobRequestState<T::EthSpec>>(id, result, cx)
|
||||
}
|
||||
} {
|
||||
self.on_lookup_request_error(process_type.id(), e, "processing_result");
|
||||
}
|
||||
};
|
||||
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
|
||||
}
|
||||
|
||||
pub fn on_processing_result_inner<R: RequestState<T>>(
|
||||
@@ -405,10 +411,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
lookup_id: SingleLookupId,
|
||||
result: BlockProcessingResult<T::EthSpec>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else {
|
||||
debug!(self.log, "Unknown single block lookup"; "id" => lookup_id);
|
||||
return Ok(());
|
||||
return Err(LookupRequestError::UnknownLookup);
|
||||
};
|
||||
|
||||
let block_root = lookup.block_root();
|
||||
@@ -442,15 +448,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
// wrong. If we already had both a block and blobs response processed, we should penalize the
|
||||
// blobs peer because they did not provide all blobs on the initial request.
|
||||
if lookup.both_components_processed() {
|
||||
let blob_peer = lookup
|
||||
if let Some(blob_peer) = lookup
|
||||
.blob_request_state
|
||||
.state
|
||||
.on_post_process_validation_failure()?;
|
||||
cx.report_peer(
|
||||
blob_peer,
|
||||
PeerAction::MidToleranceError,
|
||||
"sent_incomplete_blobs",
|
||||
);
|
||||
.on_post_process_validation_failure()?
|
||||
{
|
||||
cx.report_peer(
|
||||
blob_peer,
|
||||
PeerAction::MidToleranceError,
|
||||
"sent_incomplete_blobs",
|
||||
);
|
||||
}
|
||||
}
|
||||
Action::Retry
|
||||
}
|
||||
@@ -527,47 +535,41 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
Action::Retry => {
|
||||
// Trigger download for all components in case `MissingComponents` failed the blob
|
||||
// request. Also if blobs are `AwaitingProcessing` and need to be progressed
|
||||
lookup.continue_requests(cx)?;
|
||||
lookup.continue_requests(cx)
|
||||
}
|
||||
Action::ParentUnknown { parent_root } => {
|
||||
let peers = lookup.all_available_peers().cloned().collect::<Vec<_>>();
|
||||
lookup.set_awaiting_parent(parent_root);
|
||||
debug!(self.log, "Marking lookup as awaiting parent"; "lookup" => %block_root, "parent_root" => %parent_root);
|
||||
self.search_parent_of_child(parent_root, block_root, &peers, cx);
|
||||
Ok(LookupResult::Pending)
|
||||
}
|
||||
Action::Drop => {
|
||||
// Drop with noop
|
||||
self.drop_lookup_and_children(lookup_id);
|
||||
self.update_metrics();
|
||||
Err(LookupRequestError::Failed)
|
||||
}
|
||||
Action::Continue => {
|
||||
// Drop this completed lookup only
|
||||
self.single_block_lookups.remove(&lookup_id);
|
||||
self.update_metrics();
|
||||
debug!(self.log, "Dropping completed lookup"; "block" => %block_root);
|
||||
// Block imported, continue the requests of pending child blocks
|
||||
self.continue_child_lookups(block_root, cx);
|
||||
Ok(LookupResult::Completed)
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Makes progress on the immediate children of `block_root`
|
||||
pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext<T>) {
|
||||
let mut failed_lookups = vec![]; // < need to clean failed lookups latter to re-borrow &mut self
|
||||
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self
|
||||
|
||||
for (id, lookup) in self.single_block_lookups.iter_mut() {
|
||||
if lookup.awaiting_parent() == Some(block_root) {
|
||||
lookup.resolve_awaiting_parent();
|
||||
debug!(self.log, "Continuing child lookup"; "parent_root" => %block_root, "block_root" => %lookup.block_root());
|
||||
if let Err(e) = lookup.continue_requests(cx) {
|
||||
failed_lookups.push((*id, e));
|
||||
}
|
||||
let result = lookup.continue_requests(cx);
|
||||
lookup_results.push((*id, result));
|
||||
}
|
||||
}
|
||||
|
||||
for (id, e) in failed_lookups {
|
||||
self.on_lookup_request_error(id, e, "continue_child_lookups");
|
||||
for (id, result) in lookup_results {
|
||||
self.on_lookup_result(id, result, "continue_child_lookups", cx);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -592,16 +594,31 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
|
||||
/// Common handler a lookup request error, drop it and update metrics
|
||||
fn on_lookup_request_error(
|
||||
fn on_lookup_result(
|
||||
&mut self,
|
||||
id: SingleLookupId,
|
||||
error: LookupRequestError,
|
||||
result: Result<LookupResult, LookupRequestError>,
|
||||
source: &str,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
debug!(self.log, "Dropping lookup on request error"; "id" => id, "source" => source, "error" => ?error);
|
||||
metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[error.as_metric()]);
|
||||
self.drop_lookup_and_children(id);
|
||||
self.update_metrics();
|
||||
match result {
|
||||
Ok(LookupResult::Pending) => {} // no action
|
||||
Ok(LookupResult::Completed) => {
|
||||
if let Some(lookup) = self.single_block_lookups.remove(&id) {
|
||||
debug!(self.log, "Dropping completed lookup"; "block" => %lookup.block_root());
|
||||
metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED);
|
||||
// Block imported, continue the requests of pending child blocks
|
||||
self.continue_child_lookups(lookup.block_root(), cx);
|
||||
self.update_metrics();
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
debug!(self.log, "Dropping lookup on request error"; "id" => id, "source" => source, "error" => ?error);
|
||||
metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[error.into()]);
|
||||
self.drop_lookup_and_children(id);
|
||||
self.update_metrics();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Helper functions */
|
||||
|
||||
Reference in New Issue
Block a user