more work

This commit is contained in:
realbigsean
2023-04-11 13:13:13 -04:00
parent 66e09f49d7
commit 25ff6e8a5f
13 changed files with 918 additions and 477 deletions

View File

@@ -4,8 +4,8 @@ use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
use fnv::FnvHashMap;
use itertools::Itertools;
@@ -20,7 +20,8 @@ use types::{BlobSidecar, SignedBeaconBlock};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
use crate::metrics;
use crate::sync::block_lookups::single_block_lookup::SingleBlobRequest;
use crate::sync::block_lookups::parent_lookup::ParentRequest;
use crate::sync::block_lookups::single_block_lookup::SingleBlobsRequest;
use self::parent_lookup::PARENT_FAIL_TOLERANCE;
use self::{
@@ -28,7 +29,7 @@ use self::{
single_block_lookup::SingleBlockRequest,
};
use super::manager::BlockProcessResult;
use super::manager::BlockOrBlobProcessResult;
use super::BatchProcessResult;
use super::{
manager::{BlockProcessType, Id},
@@ -40,6 +41,7 @@ mod single_block_lookup;
#[cfg(test)]
mod tests;
pub type DownlodedBlocks<T> = (Hash256, MaybeAvailableBlock<T>);
pub type RootBlockTuple<T> = (Hash256, Arc<SignedBeaconBlock<T>>);
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
@@ -49,8 +51,14 @@ pub(crate) struct BlockLookups<T: BeaconChainTypes> {
/// Parent chain lookups being downloaded.
parent_lookups: SmallVec<[ParentLookup<T>; 3]>,
processing_parent_lookups:
HashMap<Hash256, (Vec<Hash256>, SingleBlockRequest<PARENT_FAIL_TOLERANCE>)>,
processing_parent_lookups: HashMap<
Hash256,
(
Vec<Hash256>,
SingleBlockRequest<PARENT_FAIL_TOLERANCE, T::EthSpec>,
Option<SingleBlobsRequest<PARENT_FAIL_TOLERANCE, T::EthSpec>>,
),
>,
/// A cache of failed chain lookups to prevent duplicate searches.
failed_chains: LRUTimeCache<Hash256>,
@@ -59,14 +67,20 @@ pub(crate) struct BlockLookups<T: BeaconChainTypes> {
/// received or not.
///
/// The flag allows us to determine if the peer returned data or sent us nothing.
single_block_lookups: FnvHashMap<Id, SingleBlockRequest<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS>>,
single_block_lookups:
FnvHashMap<Id, SingleBlockRequest<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T::EthSpec>>,
single_blob_lookups: FnvHashMap<Id, SingleBlobRequest<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS>>,
single_blob_lookups:
FnvHashMap<Id, SingleBlobsRequest<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T::EthSpec>>,
/// The logger for the import manager.
log: Logger,
}
// 1. on a completed single block lookup or single blob lookup, don't send for processing if a parent
// chain is being requested or processed
// 2. when a chain is processed, find the child requests and send for processing
impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn new(log: Logger) -> Self {
Self {
@@ -96,7 +110,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
if self.parent_lookups.iter_mut().any(|parent_req| {
parent_req.add_peer(&hash, &peer_id) || parent_req.contains_block(&hash)
parent_req.add_block_peer(&hash, &peer_id) || parent_req.contains_block(&hash)
}) {
// If the block was already downloaded, or is being downloaded in this moment, do not
// request it.
@@ -122,7 +136,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let mut single_block_request = SingleBlockRequest::new(hash, peer_id);
let (peer_id, request) = single_block_request
.request_block()
.make_request()
.expect("none of the possible failure cases apply for a newly created block lookup");
if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) {
self.single_block_lookups
@@ -142,52 +156,61 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
) {
// Do not re-request blobs that are already being requested
if self
.single_blob_lookups
.values_mut()
.any(|single_block_request| single_block_request.add_peer(&blob_ids, &peer_id))
{
return;
}
//
// if self.parent_lookups.iter_mut().any(|parent_req| {
// parent_req.add_peer(&hash, &peer_id) || parent_req.contains_block(&hash)
// }) {
// // If the block was already downloaded, or is being downloaded in this moment, do not
// // request it.
// return;
// }
//
// if self
// .processing_parent_lookups
// .values()
// .any(|(hashes, _last_parent_request)| hashes.contains(&hash))
// {
// // we are already processing this block, ignore it.
// return;
// }
//
// debug!(
// self.log,
// "Searching for block";
// "peer_id" => %peer_id,
// "block" => %hash
// );
//
// let mut single_block_request = SingleBlobRequest::new(hash, peer_id);
//
// let (peer_id, request) = single_block_request
// .request_block()
// .expect("none of the possible failure cases apply for a newly created block lookup");
// if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) {
// self.single_blob_lookups
// .insert(request_id, single_block_request);
//
// metrics::set_gauge(
// &metrics::SYNC_SINGLE_BLOB_LOOKUPS,
// self.single_blob_lookups.len() as i64,
// );
let to_request = blob_ids
.into_iter()
.filter(|id| {
// Do not re-request blobs that are already being requested
if self
.single_blob_lookups
.values_mut()
.any(|single_blob_request| {
single_blob_request.add_peer_if_useful(&blob_ids, &peer_id)
})
{
return false;
}
if self.parent_lookups.iter_mut().any(|parent_req| {
parent_req.add_blobs_peer(&blob_ids, &peer_id) || parent_req.contains_blob(id)
}) {
// If the blob was already downloaded, or is being downloaded in this moment, do not
// request it.
return false;
}
if self
.processing_parent_lookups
.values()
.any(|(hashes, _, _)| hashes.contains(&id.block_root))
{
// we are already processing this blob, ignore it.
return false;
}
true
})
.collect();
debug!(
self.log,
"Searching for blobs";
"peer_id" => %peer_id,
"blobs" => %to_request
);
let mut single_blob_request = SingleBlobsRequest::new(to_request, peer_id);
let (peer_id, request) = single_blob_request
.make_request()
.expect("none of the possible failure cases apply for a newly created blob lookup");
if let Ok(request_id) = cx.single_blobs_lookup_request(peer_id, request) {
self.single_blob_lookups
.insert(request_id, single_blob_request);
metrics::set_gauge(
&metrics::SYNC_SINGLE_BLOB_LOOKUPS,
self.single_blob_lookups.len() as i64,
);
}
}
pub fn search_block_delayed(
@@ -199,6 +222,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
) {
//TODO(sean) handle delay
//TODO(sean) cannot use peer id here cause it assumes it has the block, this is from gossip so not true
//
// after the delay expires, need to check da cache for what we have before requesting
self.search_block(hash, peer_id, cx);
}
@@ -211,6 +236,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx: &mut SyncNetworkContext<T>,
) {
//TODO(sean) handle delay
// after the delay expires, need to check da cache for what we have before requesting
self.search_blobs(block_root, blob_ids, peer_id, cx);
}
@@ -219,17 +245,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn search_parent(
&mut self,
block_root: Hash256,
block: BlockWrapper<T::EthSpec>,
block: MaybeAvailableBlock<T::EthSpec>,
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
) {
//
// let missing_ids = cx.chain.data_availability_checker.get_missing_blob_ids(block, Some(root));
// // TODO(sean) how do we handle this erroring?
// if let Ok(missing_ids) = missing_ids {
// self.search_blobs(missing_ids, peer_id, cx);
// }
let parent_root = block.parent_root();
// If this block or it's parent is part of a known failed chain, ignore it.
if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) {
@@ -242,8 +261,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// being searched for.
if self.parent_lookups.iter_mut().any(|parent_req| {
parent_req.contains_block(&block_root)
|| parent_req.add_peer(&block_root, &peer_id)
|| parent_req.add_peer(&parent_root, &peer_id)
|| parent_req.add_block_peer(&block_root, &peer_id)
|| parent_req.add_block_peer(&parent_root, &peer_id)
}) {
// we are already searching for this block, ignore it
return;
@@ -259,7 +278,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
let parent_lookup = ParentLookup::new(block_root, block, peer_id);
self.request_parent(parent_lookup, cx);
self.request_parent_block_and_blobs(parent_lookup, cx);
}
/* Lookup responses */
@@ -285,8 +304,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
};
match request.get_mut().verify_block(block) {
match request.get_mut().verify_response(block) {
Ok(Some((block_root, block))) => {
//TODO(sean) only send for processing if we don't have parent requests trigger
// for this block
// This is the correct block, send it for processing
if self
.send_block_for_processing(
@@ -314,7 +336,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(self.log, "Single block lookup failed";
"peer_id" => %peer_id, "error" => msg, "block_root" => %req.requested_thing);
// try the request again if possible
if let Ok((peer_id, request)) = req.request_block() {
if let Ok((peer_id, request)) = req.make_request() {
if let Ok(id) = cx.single_block_lookup_request(peer_id, request) {
self.single_block_lookups.insert(id, req);
}
@@ -340,7 +362,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let mut parent_lookup = if let Some(pos) = self
.parent_lookups
.iter()
.position(|request| request.pending_response(id))
.position(|request| request.pending_block_response(id))
{
self.parent_lookups.remove(pos)
} else {
@@ -352,19 +374,47 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
match parent_lookup.verify_block(block, &mut self.failed_chains) {
Ok(Some((block_root, block))) => {
// Block is correct, send to the beacon processor.
let chain_hash = parent_lookup.chain_hash();
if self
.send_block_for_processing(
block_root,
block,
seen_timestamp,
BlockProcessType::ParentLookup { chain_hash },
cx,
)
.is_ok()
{
self.parent_lookups.push(parent_lookup)
let block_wrapper = parent_lookup
.current_parent_blob_request
.as_ref()
.map_or(BlockWrapper::Block(block.clone()), |req| {
BlockWrapper::BlockAndBlobs(block, req.downloaded_blobs.clone())
});
let maybe_available = cx
.chain
.data_availability_checker
.check_availability(wrapper)
.unwrap(); //TODO(sean) remove unwrap
match maybe_available {
MaybeAvailableBlock::Available(available) => {
if self
.send_block_for_processing(
block_root,
available,
seen_timestamp,
BlockProcessType::ParentLookup { chain_hash },
cx,
)
.is_ok()
{
self.parent_lookups.push(parent_lookup)
}
}
MaybeAvailableBlock::AvailabilityPending(pending) => {
let missing_ids = pending.get_missing_blob_ids();
self.search_blobs(block_root, missing_ids, peer_id, cx);
let _ = parent_lookup
.current_parent_request
.downloaded_block
.insert((
block_root,
MaybeAvailableBlock::AvailabilityPending(pending),
));
self.parent_lookups.push(parent_lookup)
}
}
}
Ok(None) => {
@@ -385,7 +435,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx.report_peer(peer_id, PeerAction::LowToleranceError, e);
// We try again if possible.
self.request_parent(parent_lookup, cx);
self.request_parent_block(parent_lookup, cx);
}
VerifyError::PreviousFailure { parent_root } => {
debug!(
@@ -415,22 +465,180 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
id: Id,
peer_id: PeerId,
block: Option<Arc<BlobSidecar<T::EthSpec>>>,
blob: Option<Arc<BlobSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T>,
) {
todo!()
let mut request = match self.single_blob_lookups.entry(id) {
Entry::Occupied(req) => req,
Entry::Vacant(_) => {
if blob.is_some() {
debug!(
self.log,
"Block returned for single blob lookup not present"
);
}
return;
}
};
match request.get_mut().verify_blob(blob) {
Ok(Some((block_root, blobs))) => {
//TODO(sean) only send for processing if we don't have parent requests trigger
// for this block
// This is the correct block, send it for processing
if self
.send_block_for_processing(
block_root,
block,
seen_timestamp,
BlockProcessType::SingleBlock { id },
cx,
)
.is_err()
{
// Remove to avoid inconsistencies
self.single_block_lookups.remove(&id);
}
}
Ok(None) => {
// request finished correctly, it will be removed after the block is processed.
}
Err(error) => {
let msg: &str = error.into();
cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);
// Remove the request, if it can be retried it will be added with a new id.
let mut req = request.remove();
debug!(self.log, "Single block lookup failed";
"peer_id" => %peer_id, "error" => msg, "block_root" => %req.requested_thing);
// try the request again if possible
if let Ok((peer_id, request)) = req.make_request() {
if let Ok(id) = cx.single_block_lookup_request(peer_id, request) {
self.single_block_lookups.insert(id, req);
}
}
}
}
metrics::set_gauge(
&metrics::SYNC_SINGLE_BLOCK_LOOKUPS,
self.single_block_lookups.len() as i64,
);
}
pub fn parent_lookup_blob_response(
&mut self,
id: Id,
peer_id: PeerId,
block: Option<Arc<BlobSidecar<T::EthSpec>>>,
blob: Option<Arc<BlobSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T>,
) {
todo!()
let mut parent_lookup = if let Some(pos) = self
.parent_lookups
.iter()
.position(|request| request.pending_blob_response(id))
{
self.parent_lookups.remove(pos)
} else {
if blob.is_some() {
debug!(self.log, "Response for a parent lookup blob request that was not found"; "peer_id" => %peer_id);
}
return;
};
match parent_lookup.verify_blob(blob, &mut self.failed_chains) {
Ok(Some(blobs)) => {
if let Some((block_root, block)) =
parent_lookup.current_parent_request.downloaded_block.take()
{
let block_wrapper = parent_lookup
.current_parent_blob_request
.as_ref()
.map_or(BlockWrapper::Block(block.clone()), |req| {
BlockWrapper::BlockAndBlobs(block, req.downloaded_blobs.clone())
});
let maybe_available = cx
.chain
.data_availability_checker
.check_availability(wrapper)
.unwrap(); //TODO(sean) remove unwrap
match maybe_available {
MaybeAvailableBlock::Available(available) => {
if self
.send_block_for_processing(
block_root,
available,
seen_timestamp,
BlockProcessType::ParentLookup { chain_hash },
cx,
)
.is_ok()
{
self.parent_lookups.push(parent_lookup)
}
}
MaybeAvailableBlock::AvailabilityPending(pending) => {
let missing_ids = pending.get_missing_blob_ids();
self.search_blobs(block_root, missing_ids, peer_id, cx);
parent_lookup
.current_parent_request
.downloaded_block
.insert((
block_root,
MaybeAvailableBlock::AvailabilityPending(pending),
));
self.parent_lookups.push(parent_lookup)
}
}
}
}
Ok(None) => {
// Request finished successfully, nothing else to do. It will be removed after the
// processing result arrives.
self.parent_lookups.push(parent_lookup);
}
Err(e) => match e.into() {
VerifyError::RootMismatch
| VerifyError::NoBlockReturned
| VerifyError::ExtraBlocksReturned => {
let e = e.into();
warn!(self.log, "Peer sent invalid response to parent request.";
"peer_id" => %peer_id, "reason" => %e);
// We do not tolerate these kinds of errors. We will accept a few but these are signs
// of a faulty peer.
cx.report_peer(peer_id, PeerAction::LowToleranceError, e);
// We try again if possible.
self.request_parent_blob(parent_lookup, cx);
}
VerifyError::PreviousFailure { parent_root } => {
debug!(
self.log,
"Parent chain ignored due to past failure";
"block" => %parent_root,
);
// Add the root block to failed chains
self.failed_chains.insert(parent_lookup.chain_hash());
cx.report_peer(
peer_id,
PeerAction::MidToleranceError,
"bbroot_failed_chains",
);
}
},
};
metrics::set_gauge(
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
self.parent_lookups.len() as i64,
);
}
/* Error responses */
@@ -457,7 +665,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.collect::<Vec<_>>()
{
// retry the request
match req.request_block() {
match req.make_request() {
Ok((peer_id, block_request)) => {
if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) {
self.single_block_lookups.insert(request_id, req);
@@ -479,12 +687,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
while let Some(pos) = self
.parent_lookups
.iter_mut()
.position(|req| req.check_peer_disconnected(peer_id).is_err())
.position(|req| req.check_block_peer_disconnected(peer_id).is_err())
{
let parent_lookup = self.parent_lookups.remove(pos);
trace!(self.log, "Parent lookup's peer disconnected"; &parent_lookup);
self.request_parent(parent_lookup, cx);
self.request_parent_block_and_blobs(parent_lookup, cx);
}
//TODO(sean) add lookups for blobs
}
/// An RPC error has occurred during a parent lookup. This function handles this case.
@@ -498,13 +708,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if let Some(pos) = self
.parent_lookups
.iter()
.position(|request| request.pending_response(id))
.position(|request| request.pending_block_response(id))
{
let mut parent_lookup = self.parent_lookups.remove(pos);
parent_lookup.download_failed();
parent_lookup.block_download_failed(id);
trace!(self.log, "Parent lookup request failed"; &parent_lookup);
self.request_parent(parent_lookup, cx);
self.request_parent_block(parent_lookup, cx);
} else {
return debug!(self.log, "RPC failure for a parent lookup request that was not found"; "peer_id" => %peer_id);
};
@@ -518,7 +728,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if let Some(mut request) = self.single_block_lookups.remove(&id) {
request.register_failure_downloading();
trace!(self.log, "Single block lookup failed"; "block" => %request.requested_thing);
if let Ok((peer_id, block_request)) = request.request_block() {
if let Ok((peer_id, block_request)) = request.make_request() {
if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) {
self.single_block_lookups.insert(request_id, request);
}
@@ -536,7 +746,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn single_block_processed(
&mut self,
id: Id,
result: BlockProcessResult<T::EthSpec>,
result: BlockOrBlobProcessResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
let mut req = match self.single_block_lookups.remove(&id) {
@@ -556,7 +766,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};
match result {
BlockProcessResult::Ok(status) => match status {
BlockOrBlobProcessResult::Ok(status) => match status {
AvailabilityProcessingStatus::Imported(hash) => {
trace!(self.log, "Single block processing succeeded"; "block" => %root);
}
@@ -567,7 +777,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
warn!(self.log, "Block processed but returned PendingBlock"; "block" => %hash);
}
},
BlockProcessResult::Ignored => {
BlockOrBlobProcessResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
// This implies that the cpu is overloaded. Drop the request.
warn!(
@@ -576,7 +786,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"action" => "dropping single block request"
);
}
BlockProcessResult::Err(e) => {
BlockOrBlobProcessResult::Err(e) => {
trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e);
match e {
BlockError::BlockIsAlreadyKnown => {
@@ -608,7 +818,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
// Try it again if possible.
req.register_failure_processing();
if let Ok((peer_id, request)) = req.request_block() {
if let Ok((peer_id, request)) = req.make_request() {
if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request)
{
// insert with the new id
@@ -629,7 +839,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn parent_block_processed(
&mut self,
chain_hash: Hash256,
result: BlockProcessResult<T::EthSpec>,
result: BlockOrBlobProcessResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
let (mut parent_lookup, peer_id) = if let Some((pos, peer)) = self
@@ -638,7 +848,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.enumerate()
.find_map(|(pos, request)| {
request
.get_processing_peer(chain_hash)
.get_block_processing_peer(chain_hash)
.map(|peer| (pos, peer))
}) {
(self.parent_lookups.remove(pos), peer)
@@ -647,23 +857,25 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};
match &result {
BlockProcessResult::Ok(status) => {
BlockOrBlobProcessResult::Ok(status) => {
match status {
AvailabilityProcessingStatus::Imported(hash) => {
trace!(self.log, "Parent block processing succeeded"; &parent_lookup)
}
AvailabilityProcessingStatus::PendingBlobs(block_root, blobs) => {
// trigger?
// make sure we have a pending blobs request outstanding
}
AvailabilityProcessingStatus::PendingBlock(hash) => {
// logic error
}
}
}
BlockProcessResult::Err(e) => {
BlockOrBlobProcessResult::Err(e) => {
trace!(self.log, "Parent block processing failed"; &parent_lookup, "error" => %e)
}
BlockProcessResult::Ignored => {
BlockOrBlobProcessResult::Ignored => {
trace!(
self.log,
"Parent block processing job was ignored";
@@ -674,27 +886,21 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
match result {
BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlock(_)) => {
BlockOrBlobProcessResult::Ok(AvailabilityProcessingStatus::PendingBlock(_)) => {
// doesn't make sense
}
BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlobs(block_root, blobs_ids)) => {
self.search_blobs(block_root, blobs_ids, peer_id, cx);
BlockOrBlobProcessResult::Ok(AvailabilityProcessingStatus::PendingBlobs(
block_root,
blobs_ids,
)) => {
self.search_blobs(block_root, blobs_ids, peer_id, cx);
}
BlockProcessResult::Err(BlockError::ParentUnknown(block)) => {
// TODO(sean) how do we handle this erroring?
let missing_ids = cx
.chain
.data_availability_checker
.get_missing_blob_ids(block.clone(), None)
.unwrap_or_default();
if let Some(block_root) = missing_ids.first().map(|first_id| first_id.block_root){
self.search_blobs(block_root, missing_ids, peer_id, cx);
}
BlockOrBlobProcessResult::Err(BlockError::ParentUnknown(block)) => {
parent_lookup.add_block(block);
self.request_parent(parent_lookup, cx);
self.request_parent_block_and_blobs(parent_lookup, cx);
}
BlockProcessResult::Ok(AvailabilityProcessingStatus::Imported(_))
| BlockProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => {
BlockOrBlobProcessResult::Ok(AvailabilityProcessingStatus::Imported(_))
| BlockOrBlobProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => {
// Check if the beacon processor is available
let beacon_processor_send = match cx.processor_channel_if_enabled() {
Some(channel) => channel,
@@ -706,7 +912,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
}
};
let (chain_hash, blocks, hashes, request) = parent_lookup.parts_for_processing();
let (chain_hash, blocks, hashes, block_request, blob_request) =
parent_lookup.parts_for_processing();
let process_id = ChainSegmentProcessId::ParentLookup(chain_hash);
let work = WorkEvent::chain_segment(process_id, blocks);
@@ -714,7 +921,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
match beacon_processor_send.try_send(work) {
Ok(_) => {
self.processing_parent_lookups
.insert(chain_hash, (hashes, request));
.insert(chain_hash, (hashes, block_request));
}
Err(e) => {
error!(
@@ -725,7 +932,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
}
ref e @ BlockProcessResult::Err(BlockError::ExecutionPayloadError(ref epe))
ref e @ BlockOrBlobProcessResult::Err(BlockError::ExecutionPayloadError(ref epe))
if !epe.penalize_peer() =>
{
// These errors indicate that the execution layer is offline
@@ -737,7 +944,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"error" => ?e
);
}
BlockProcessResult::Err(outcome) => {
BlockOrBlobProcessResult::Err(outcome) => {
// all else we consider the chain a failure and downvote the peer that sent
// us the last block
warn!(
@@ -752,10 +959,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err");
// Try again if possible
parent_lookup.processing_failed();
self.request_parent(parent_lookup, cx);
parent_lookup.block_processing_failed();
self.request_parent_block(parent_lookup, cx);
}
BlockProcessResult::Ignored => {
BlockOrBlobProcessResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
// This implies that the cpu is overloaded. Drop the request.
warn!(
@@ -772,24 +979,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
}
pub fn single_blob_processed(
&mut self,
id: Id,
result: BlockProcessResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
todo!()
}
pub fn parent_blob_processed(
&mut self,
chain_hash: Hash256,
result: BlockProcessResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
todo!()
}
pub fn parent_chain_processed(
&mut self,
chain_hash: Hash256,
@@ -806,7 +995,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result);
match result {
BatchProcessResult::Success { .. } => {
// nothing to do.
//TODO(sean) find single blob and block lookups and send for processing
}
BatchProcessResult::FaultyFailure {
imported_blocks: _,
@@ -833,7 +1022,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
fn send_block_for_processing(
&mut self,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
block: BlockWrapper<T::EthSpec>,
duration: Duration,
process_type: BlockProcessType,
cx: &mut SyncNetworkContext<T>,
@@ -860,12 +1049,42 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
fn request_parent(
fn request_parent_block(
&mut self,
mut parent_lookup: ParentLookup<T>,
cx: &mut SyncNetworkContext<T>,
) {
match parent_lookup.request_parent(cx) {
let response = parent_lookup.request_parent_block(cx);
self.handle_response(parent_lookup, response);
}
fn request_parent_blob(
&mut self,
mut parent_lookup: ParentLookup<T>,
cx: &mut SyncNetworkContext<T>,
) {
let response = parent_lookup.request_parent_blobs(cx);
self.handle_response(parent_lookup, response);
}
fn request_parent_block_and_blobs(
&mut self,
mut parent_lookup: ParentLookup<T>,
cx: &mut SyncNetworkContext<T>,
) {
let response = parent_lookup
.request_parent_block(cx)
.and_then(|| parent_lookup.request_parent_blobs(cx));
self.handle_response(parent_lookup, response);
}
//TODO(sean) how should peer scoring work with failures in this method?
fn handle_response(
&mut self,
mut parent_lookup: ParentLookup<T>,
result: Result<(), parent_lookup::RequestError>,
) {
match result {
Err(e) => {
debug!(self.log, "Failed to request parent"; &parent_lookup, "error" => e.as_static());
match e {
@@ -875,7 +1094,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
parent_lookup::RequestError::ChainTooLong => {
self.failed_chains.insert(parent_lookup.chain_hash());
// This indicates faulty peers.
for &peer_id in parent_lookup.used_peers() {
for &peer_id in parent_lookup.used_block_peers() {
cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static())
}
}
@@ -888,7 +1107,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.failed_chains.insert(parent_lookup.chain_hash());
}
// This indicates faulty peers.
for &peer_id in parent_lookup.used_peers() {
for &peer_id in parent_lookup.used_block_peers() {
cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static())
}
}