handle parent blob request edge cases correctly. fix data availability boundary check

This commit is contained in:
realbigsean
2022-12-19 11:39:09 -05:00
parent ba1cabc0c9
commit 5de4f5b8d0
17 changed files with 163 additions and 95 deletions

View File

@@ -254,6 +254,14 @@ impl<T: BeaconChainTypes> Worker<T> {
"peer" => %peer_id,
"request_root" => ?root
);
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"No blob for requested block".into(),
request_id,
);
send_response = false;
break;
}
Ok((None, Some(_))) => {
debug!(

View File

@@ -11,6 +11,7 @@ use crate::error;
use crate::service::{NetworkMessage, RequestId};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use futures::prelude::*;
use lighthouse_network::rpc::RPCError;
use lighthouse_network::{
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
};
@@ -58,6 +59,7 @@ pub enum RouterMessage<T: EthSpec> {
RPCFailed {
peer_id: PeerId,
request_id: RequestId,
error: RPCError,
},
/// A gossip message has been received. The fields are: message id, the peer that sent us this
/// message, the message itself and a bool which indicates if the message should be processed
@@ -140,8 +142,9 @@ impl<T: BeaconChainTypes> Router<T> {
RouterMessage::RPCFailed {
peer_id,
request_id,
error,
} => {
self.processor.on_rpc_error(peer_id, request_id);
self.processor.on_rpc_error(peer_id, request_id, error);
}
RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => {
self.handle_gossip(id, peer_id, gossip, should_process);

View File

@@ -103,12 +103,13 @@ impl<T: BeaconChainTypes> Processor<T> {
/// An error occurred during an RPC request. The state is maintained by the sync manager, so
/// this function notifies the sync manager of the error.
pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId) {
pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
// Check if the failed RPC belongs to sync
if let RequestId::Sync(request_id) = request_id {
self.send_to_sync(SyncMessage::RpcError {
peer_id,
request_id,
error,
});
}
}

View File

@@ -499,10 +499,11 @@ impl<T: BeaconChainTypes> NetworkService<T> {
response,
});
}
NetworkEvent::RPCFailed { id, peer_id } => {
NetworkEvent::RPCFailed { id, peer_id, error } => {
self.send_to_router(RouterMessage::RPCFailed {
peer_id,
request_id: id,
error,
});
}
NetworkEvent::StatusPeer(peer_id) => {

View File

@@ -6,6 +6,7 @@ use beacon_chain::{BeaconChainTypes, BlockError};
use fnv::FnvHashMap;
use futures::StreamExt;
use itertools::{Either, Itertools};
use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode};
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
use slog::{debug, error, trace, warn, Logger};
@@ -40,6 +41,13 @@ pub type RootBlockTuple<T> = (Hash256, BlockWrapper<T>);
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
/// This is used to resolve the scenario where we request a parent from before the data availability
/// boundary and need to retry with a request for only the block.
pub enum ForceBlockRequest {
True,
False,
}
pub(crate) struct BlockLookups<T: BeaconChainTypes> {
/// Parent chain lookups being downloaded.
parent_lookups: SmallVec<[ParentLookup<T>; 3]>,
@@ -165,7 +173,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
let parent_lookup = ParentLookup::new(block_root, block, peer_id);
self.request_parent(parent_lookup, cx);
self.request_parent(parent_lookup, cx, ForceBlockRequest::False);
}
/* Lookup responses */
@@ -291,7 +299,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx.report_peer(peer_id, PeerAction::LowToleranceError, e);
// We try again if possible.
self.request_parent(parent_lookup, cx);
self.request_parent(parent_lookup, cx, ForceBlockRequest::False);
}
VerifyError::PreviousFailure { parent_root } => {
debug!(
@@ -367,7 +375,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
{
let parent_lookup = self.parent_lookups.remove(pos);
trace!(self.log, "Parent lookup's peer disconnected"; &parent_lookup);
self.request_parent(parent_lookup, cx);
self.request_parent(parent_lookup, cx, ForceBlockRequest::False);
}
}
@@ -377,6 +385,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
id: Id,
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
error: RPCError,
) {
if let Some(pos) = self
.parent_lookups
@@ -386,7 +395,19 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let mut parent_lookup = self.parent_lookups.remove(pos);
parent_lookup.download_failed();
trace!(self.log, "Parent lookup request failed"; &parent_lookup);
self.request_parent(parent_lookup, cx);
// `ResourceUnavailable` indicates we requested a parent block from prior to the 4844 fork epoch.
let force_block_request = if let RPCError::ErrorResponse(
RPCResponseErrorCode::ResourceUnavailable,
_,
) = error
{
debug!(self.log, "RPC parent lookup for block and blobs failed. Retrying the request for just a block"; "peer_id" => %peer_id);
ForceBlockRequest::True
} else {
ForceBlockRequest::False
};
self.request_parent(parent_lookup, cx, force_block_request);
} else {
return debug!(self.log, "RPC failure for a parent lookup request that was not found"; "peer_id" => %peer_id);
};
@@ -542,7 +563,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// need to keep looking for parents
// add the block back to the queue and continue the search
parent_lookup.add_block(block);
self.request_parent(parent_lookup, cx);
self.request_parent(parent_lookup, cx, ForceBlockRequest::False);
}
BlockProcessResult::Ok
| BlockProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => {
@@ -604,7 +625,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// Try again if possible
parent_lookup.processing_failed();
self.request_parent(parent_lookup, cx);
self.request_parent(parent_lookup, cx, ForceBlockRequest::False);
}
BlockProcessResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
@@ -697,8 +718,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
mut parent_lookup: ParentLookup<T>,
cx: &mut SyncNetworkContext<T>,
force_block_request: ForceBlockRequest,
) {
match parent_lookup.request_parent(cx) {
match parent_lookup.request_parent(cx, force_block_request) {
Err(e) => {
debug!(self.log, "Failed to request parent"; &parent_lookup, "error" => e.as_static());
match e {

View File

@@ -6,6 +6,7 @@ use store::{Hash256, SignedBeaconBlock};
use strum::IntoStaticStr;
use types::signed_block_and_blobs::BlockWrapper;
use crate::sync::block_lookups::ForceBlockRequest;
use crate::sync::{
manager::{Id, SLOT_IMPORT_TOLERANCE},
network_context::SyncNetworkContext,
@@ -72,14 +73,18 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
}
/// Attempts to request the next unknown parent. If the request fails, it should be removed.
pub fn request_parent(&mut self, cx: &mut SyncNetworkContext<T>) -> Result<(), RequestError> {
pub fn request_parent(
&mut self,
cx: &mut SyncNetworkContext<T>,
force_block_request: ForceBlockRequest,
) -> Result<(), RequestError> {
// check to make sure this request hasn't failed
if self.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE {
return Err(RequestError::ChainTooLong);
}
let (peer_id, request) = self.current_parent_request.request_block()?;
match cx.parent_lookup_request(peer_id, request) {
match cx.parent_lookup_request(peer_id, request, force_block_request) {
Ok(request_id) => {
self.current_parent_request_id = Some(request_id);
Ok(())

View File

@@ -45,6 +45,7 @@ use crate::sync::range_sync::ExpectedBatchTy;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
use futures::StreamExt;
use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode};
use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::SyncInfo;
use lighthouse_network::{PeerAction, PeerId};
@@ -131,6 +132,7 @@ pub enum SyncMessage<T: EthSpec> {
RpcError {
peer_id: PeerId,
request_id: RequestId,
error: RPCError,
},
/// A batch has been processed by the block processor thread.
@@ -282,7 +284,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
/// Handles RPC errors related to requests that were emitted from the sync manager.
fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId) {
fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
trace!(self.log, "Sync manager received a failed RPC");
match request_id {
RequestId::SingleBlock { id } => {
@@ -291,7 +293,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
RequestId::ParentLookup { id } => {
self.block_lookups
.parent_lookup_failed(id, peer_id, &mut self.network);
.parent_lookup_failed(id, peer_id, &mut self.network, error);
}
RequestId::BackFillSync { id } => {
if let Some(batch_id) = self
@@ -603,7 +605,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncMessage::RpcError {
peer_id,
request_id,
} => self.inject_error(peer_id, request_id),
error,
} => self.inject_error(peer_id, request_id, error),
SyncMessage::BlockProcessed {
process_type,
result,

View File

@@ -6,6 +6,7 @@ use super::range_sync::{BatchId, ChainId, ExpectedBatchTy};
use crate::beacon_processor::WorkEvent;
use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::ForceBlockRequest;
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
@@ -50,7 +51,7 @@ impl<T: EthSpec> BlockBlobRequestInfo<T> {
}
pub fn pop_response(&mut self) -> Option<SignedBeaconBlockAndBlobsSidecar<T>> {
if !self.accumulated_blocks.is_empty() && !self.accumulated_blocks.is_empty() {
if !self.accumulated_blocks.is_empty() && !self.accumulated_sidecars.is_empty() {
let beacon_block = self.accumulated_blocks.pop_front().expect("non empty");
let blobs_sidecar = self.accumulated_sidecars.pop_front().expect("non empty");
return Some(SignedBeaconBlockAndBlobsSidecar {
@@ -504,11 +505,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
peer_id: PeerId,
request: BlocksByRootRequest,
force_block_request: ForceBlockRequest,
) -> Result<Id, &'static str> {
let request = if self
.chain
.is_data_availability_check_required()
.map_err(|_| "Unable to read slot clock")?
&& matches!(force_block_request, ForceBlockRequest::False)
{
trace!(
self.log,