lots of refactoring

This commit is contained in:
realbigsean
2023-05-19 17:47:44 -04:00
parent bef63e42f7
commit c7aad773a8
8 changed files with 408 additions and 382 deletions

View File

@@ -546,7 +546,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
}
self.request_parent_blob(parent_lookup, cx);
self.request_parent_blobs(parent_lookup, cx);
} else {
self.parent_lookups.push(parent_lookup)
}
@@ -557,53 +557,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// processing result arrives.
self.parent_lookups.push(parent_lookup);
}
Err(e) => match e {
ParentVerifyError::RootMismatch
| ParentVerifyError::NoBlockReturned
| ParentVerifyError::NotEnoughBlobsReturned
| ParentVerifyError::ExtraBlocksReturned
| ParentVerifyError::UnrequestedBlobId
| ParentVerifyError::ExtraBlobsReturned
| ParentVerifyError::InvalidIndex(_) => {
let e = e.into();
warn!(self.log, "Peer sent invalid response to parent request.";
"peer_id" => %peer_id, "reason" => %e);
// We do not tolerate these kinds of errors. We will accept a few but these are signs
// of a faulty peer.
cx.report_peer(peer_id, PeerAction::LowToleranceError, e);
// We try again if possible.
self.request_parent_block(parent_lookup, cx);
}
ParentVerifyError::PreviousFailure { parent_root } => {
debug!(
self.log,
"Parent chain ignored due to past failure";
"block" => %parent_root,
);
// Add the root block to failed chains
self.failed_chains.insert(parent_lookup.chain_hash());
cx.report_peer(
peer_id,
PeerAction::MidToleranceError,
"bbroot_failed_chains",
);
}
ParentVerifyError::BenignFailure => {
trace!(
self.log,
"Requested peer could not respond to block request, requesting a new peer";
);
parent_lookup
.current_parent_request
.block_request_state
.potential_peers
.remove(&peer_id);
self.request_parent_block(parent_lookup, cx);
}
},
Err(e) => {
self.handle_parent_verify_error(peer_id, parent_lookup, ResponseType::Block, e, cx)
}
};
metrics::set_gauge(
@@ -660,53 +616,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// Waiting for more blobs to arrive
self.parent_lookups.push(parent_lookup);
}
Err(e) => match e {
ParentVerifyError::RootMismatch
| ParentVerifyError::NoBlockReturned
| ParentVerifyError::NotEnoughBlobsReturned
| ParentVerifyError::ExtraBlocksReturned
| ParentVerifyError::UnrequestedBlobId
| ParentVerifyError::ExtraBlobsReturned
| ParentVerifyError::InvalidIndex(_) => {
let e = e.into();
warn!(self.log, "Peer sent invalid response to parent request.";
"peer_id" => %peer_id, "reason" => %e);
// We do not tolerate these kinds of errors. We will accept a few but these are signs
// of a faulty peer.
cx.report_peer(peer_id, PeerAction::LowToleranceError, e);
// We try again if possible.
self.request_parent_blob(parent_lookup, cx);
}
ParentVerifyError::PreviousFailure { parent_root } => {
debug!(
self.log,
"Parent chain ignored due to past failure";
"block" => %parent_root,
);
// Add the root block to failed chains
self.failed_chains.insert(parent_lookup.chain_hash());
cx.report_peer(
peer_id,
PeerAction::MidToleranceError,
"bbroot_failed_chains",
);
}
ParentVerifyError::BenignFailure => {
trace!(
self.log,
"Requested peer could not respond to blob request, requesting a new peer";
);
parent_lookup
.current_parent_request
.blob_request_state
.potential_peers
.remove(&peer_id);
self.request_parent_blob(parent_lookup, cx);
}
},
Err(e) => {
self.handle_parent_verify_error(peer_id, parent_lookup, ResponseType::Blob, e, cx)
}
};
metrics::set_gauge(
@@ -715,36 +627,91 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
}
fn handle_parent_verify_error(
&mut self,
peer_id: PeerId,
mut parent_lookup: ParentLookup<T>,
response_type: ResponseType,
e: ParentVerifyError,
cx: &mut SyncNetworkContext<T>,
) {
match e {
ParentVerifyError::RootMismatch
| ParentVerifyError::NoBlockReturned
| ParentVerifyError::NotEnoughBlobsReturned
| ParentVerifyError::ExtraBlocksReturned
| ParentVerifyError::UnrequestedBlobId
| ParentVerifyError::ExtraBlobsReturned
| ParentVerifyError::InvalidIndex(_) => {
let e = e.into();
warn!(self.log, "Peer sent invalid response to parent request.";
"peer_id" => %peer_id, "reason" => %e);
// We do not tolerate these kinds of errors. We will accept a few but these are signs
// of a faulty peer.
cx.report_peer(peer_id, PeerAction::LowToleranceError, e);
// We try again if possible.
match response_type {
ResponseType::Block => self.request_parent_block(parent_lookup, cx),
ResponseType::Blob => self.request_parent_blobs(parent_lookup, cx),
};
}
ParentVerifyError::PreviousFailure { parent_root } => {
debug!(
self.log,
"Parent chain ignored due to past failure";
"block" => %parent_root,
);
// Add the root block to failed chains
self.failed_chains.insert(parent_lookup.chain_hash());
cx.report_peer(
peer_id,
PeerAction::MidToleranceError,
"bbroot_failed_chains",
);
}
ParentVerifyError::BenignFailure => {
trace!(
self.log,
"Requested peer could not respond to block request, requesting a new peer";
);
parent_lookup
.current_parent_request
.remove_peer_if_useless(&peer_id, response_type);
match response_type {
ResponseType::Block => self.request_parent_block(parent_lookup, cx),
ResponseType::Blob => self.request_parent_blobs(parent_lookup, cx),
};
}
}
}
/* Error responses */
pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext<T>) {
self.single_block_lookups
.retain_mut(|(block_id_opt, blob_id_opt, req)| {
let should_remove_block = block_id_opt
.as_mut()
.filter(|_| req.block_request_state
.check_peer_disconnected(peer_id)
.is_err())
.map(|block_id| {
let should_remove_block = should_remove_disconnected_peer(
block_id_opt,
ResponseType::Block,
peer_id,
cx,
req,
&self.log,
);
let should_remove_blob = should_remove_disconnected_peer(
blob_id_opt,
ResponseType::Blob,
peer_id,
cx,
req,
&self.log,
);
trace!(self.log, "Single block lookup failed on peer disconnection"; "block_root" => ?req.requested_block_root, );
retry_request_after_failure(block_id, req, ResponseType::Block, peer_id, cx, &self.log)
})
.unwrap_or(ShouldRemoveLookup::False);
let should_remove_blob = blob_id_opt
.as_mut()
.filter(|_| req.blob_request_state
.check_peer_disconnected(peer_id)
.is_err())
.map(|blob_id| {
trace!(self.log, "Single blob lookup failed on peer disconnection"; "block_root" => ?req.requested_block_root, );
retry_request_after_failure(blob_id, req, ResponseType::Blob, peer_id, cx, &self.log)
})
.unwrap_or(ShouldRemoveLookup::False);
matches!(should_remove_block, ShouldRemoveLookup::False) && matches!(should_remove_blob, ShouldRemoveLookup::False)
matches!(should_remove_block, ShouldRemoveLookup::False)
&& matches!(should_remove_blob, ShouldRemoveLookup::False)
});
/* Check disconnection for parent lookups */
@@ -790,7 +757,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
parent_lookup.blob_download_failed();
trace!(self.log, "Parent lookup blobs request failed"; &parent_lookup, "error" => msg);
self.request_parent_blob(parent_lookup, cx);
self.request_parent_blobs(parent_lookup, cx);
} else {
return debug!(self.log, "RPC failure for a blobs parent lookup request that was not found"; "peer_id" => %peer_id, "error" => msg);
};
@@ -808,30 +775,32 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
error: RPCError,
) {
let msg = error.as_static_str();
self.single_block_lookups.retain_mut(|(block_id_opt, blob_id_opt, req)|{
self.single_block_lookups
.retain_mut(|(block_id_opt, blob_id_opt, req)| {
let should_remove_block = should_remove_failed_lookup(
block_id_opt,
ResponseType::Block,
id,
msg,
peer_id,
cx,
req,
&self.log,
);
let should_remove_blob = should_remove_failed_lookup(
blob_id_opt,
ResponseType::Blob,
id,
msg,
peer_id,
cx,
req,
&self.log,
);
let should_remove_block = block_id_opt
.as_mut()
.filter(|block_id| **block_id == id)
.map(|block_id| {
req.block_request_state.register_failure_downloading();
trace!(self.log, "Single block lookup failed"; "block" => %req.requested_block_root, "error" => msg);
retry_request_after_failure(block_id, req, ResponseType::Block, peer_id, cx, &self.log)
})
.unwrap_or(ShouldRemoveLookup::False);
let should_remove_blob = blob_id_opt
.as_mut()
.filter(|blob_id| **blob_id == id)
.map(|blob_id| {
req.blob_request_state.register_failure_downloading();
trace!(self.log, "Single blob lookup failed"; "block" => %req.requested_block_root, "error" =>msg);
retry_request_after_failure(blob_id, req, ResponseType::Block, peer_id, cx, &self.log)
})
.unwrap_or(ShouldRemoveLookup::False);
matches!(should_remove_block, ShouldRemoveLookup::False) && matches!(should_remove_blob, ShouldRemoveLookup::False)
});
matches!(should_remove_block, ShouldRemoveLookup::False)
&& matches!(should_remove_blob, ShouldRemoveLookup::False)
});
metrics::set_gauge(
&metrics::SYNC_SINGLE_BLOCK_LOOKUPS,
@@ -850,13 +819,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
) {
let lookup_components_opt = self.single_block_lookups.iter_mut().enumerate().find_map(
|(index, (block_id_opt, blob_id_opt, req))| {
let id_filter = |id: &Id| -> bool { target_id == *id };
let block_matches = block_id_opt.as_ref().map(id_filter).unwrap_or(false);
let blob_matches = blob_id_opt.as_ref().map(id_filter).unwrap_or(false);
if !block_matches && !blob_matches {
return None;
}
Some((index, block_id_opt, blob_id_opt, req))
let block_match = block_id_opt.as_ref() == Some(&target_id);
let blob_match = blob_id_opt.as_ref() == Some(&target_id);
(block_match || blob_match).then_some((index, block_id_opt, blob_id_opt, req))
},
);
let (index, block_id_ref, blob_id_ref, request_ref) = match lookup_components_opt {
@@ -883,116 +848,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
trace!(self.log, "Single block processing succeeded"; "block" => %root);
ShouldRemoveLookup::True
}
AvailabilityProcessingStatus::MissingComponents(_, block_root) => {
// if peer should have both, and missing components is received after we've
// processed the opposite, then we can downscore.
let should_remove = match response_type {
ResponseType::Block => {
if request_ref.blob_request_state.component_processed {
match request_ref.processing_peer(ResponseType::Blob) {
Ok(PeerShouldHave::BlockAndBlobs(other_peer)) => {
cx.report_peer(
peer_id.to_peer_id(),
PeerAction::MidToleranceError,
"single_block_failure",
);
if let Some(blob_id_ref) = blob_id_ref {
// Try it again if possible.
retry_request_after_failure(
blob_id_ref,
request_ref,
ResponseType::Blob,
peer_id.as_peer_id(),
cx,
&self.log,
)
} else {
ShouldRemoveLookup::False
}
}
Ok(PeerShouldHave::Neither(other_peer)) => {
request_ref
.blob_request_state
.remove_peer_if_useless(&other_peer);
if let Some(blob_id_ref) = blob_id_ref {
// Try it again if possible.
retry_request_after_failure(
blob_id_ref,
request_ref,
ResponseType::Blob,
peer_id.as_peer_id(),
cx,
&self.log,
)
} else {
ShouldRemoveLookup::False
}
}
Err(()) => {
//TODO(sean) retry?
ShouldRemoveLookup::False
}
}
} else {
request_ref.block_request_state.component_processed = true;
ShouldRemoveLookup::False
}
}
ResponseType::Blob => {
if request_ref.block_request_state.component_processed {
match request_ref.processing_peer(ResponseType::Blob) {
Ok(PeerShouldHave::BlockAndBlobs(other_peer)) => {
cx.report_peer(
other_peer,
PeerAction::MidToleranceError,
"single_block_failure",
);
if let Some(blob_id_ref) = blob_id_ref {
// Try it again if possible.
retry_request_after_failure(
blob_id_ref,
request_ref,
ResponseType::Blob,
peer_id.as_peer_id(),
cx,
&self.log,
)
} else {
ShouldRemoveLookup::False
}
}
Ok(PeerShouldHave::Neither(other_peer)) => {
request_ref
.blob_request_state
.remove_peer_if_useless(&other_peer);
if let Some(blob_id_ref) = blob_id_ref {
// Try it again if possible.
retry_request_after_failure(
blob_id_ref,
request_ref,
ResponseType::Blob,
peer_id.as_peer_id(),
cx,
&self.log,
)
} else {
ShouldRemoveLookup::False
}
}
Err(()) => {
//TODO(sean) retry?
ShouldRemoveLookup::False
}
}
} else {
// retry block here?
request_ref.blob_request_state.component_processed = true;
ShouldRemoveLookup::False
}
}
};
should_remove
AvailabilityProcessingStatus::MissingComponents(_, _block_root) => {
should_remove_missing_components(
request_ref,
response_type,
blob_id_ref,
cx,
&self.log,
)
}
},
BlockProcessingResult::Ignored => {
@@ -1018,13 +881,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
ShouldRemoveLookup::True
}
BlockError::ParentUnknown(block) => {
self.search_parent(
block.slot(),
root,
block.parent_root(),
peer_id.to_peer_id(),
cx,
);
let slot = block.slot();
let parent_root = block.parent_root();
let (block, blobs) = block.deconstruct();
request_ref.add_unknown_parent_block(block);
if let Some(blobs) = blobs {
request_ref.add_unknown_parent_blobs(blobs);
}
self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx);
ShouldRemoveLookup::False
}
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
@@ -1047,27 +911,32 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
if let Some(blob_id_ref) = blob_id_ref {
// Try it again if possible.
retry_request_after_failure(
blob_id_ref,
request_ref,
ResponseType::Blob,
peer_id.as_peer_id(),
cx,
&self.log,
)
} else if let Some(block_id_ref) = block_id_ref {
// Try it again if possible.
retry_request_after_failure(
block_id_ref,
request_ref,
ResponseType::Block,
peer_id.as_peer_id(),
cx,
&self.log,
)
} else {
ShouldRemoveLookup::True
if !request_ref.awaiting_download(ResponseType::Blob) {
retry_request_after_failure(
blob_id_ref,
request_ref,
ResponseType::Blob,
peer_id.as_peer_id(),
cx,
&self.log,
);
}
}
if let Some(block_id_ref) = block_id_ref {
if !request_ref.awaiting_download(ResponseType::Block) {
// Try it again if possible.
retry_request_after_failure(
block_id_ref,
request_ref,
ResponseType::Block,
peer_id.as_peer_id(),
cx,
&self.log,
);
}
}
ShouldRemoveLookup::False
}
}
}
@@ -1396,7 +1265,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.handle_response(parent_lookup, cx, response, ResponseType::Block);
}
fn request_parent_blob(
fn request_parent_blobs(
&mut self,
mut parent_lookup: ParentLookup<T>,
cx: &mut SyncNetworkContext<T>,
@@ -1498,19 +1367,7 @@ fn handle_block_lookup_verify_error<T: BeaconChainTypes>(
log: &Logger,
) -> ShouldRemoveLookup {
let msg = if matches!(e, LookupVerifyError::BenignFailure) {
match response_type {
// Only remove a potential peer if there are better options
ResponseType::Block => {
request_ref
.block_request_state
.remove_peer_if_useless(&peer_id);
}
ResponseType::Blob => {
request_ref
.blob_request_state
.remove_peer_if_useless(&peer_id);
}
};
request_ref.remove_peer_if_useless(&peer_id, response_type);
"peer could not response to request"
} else {
let msg = e.into();
@@ -1580,3 +1437,83 @@ fn retry_request_after_failure<T: BeaconChainTypes>(
}
ShouldRemoveLookup::False
}
fn should_remove_disconnected_peer<T: BeaconChainTypes>(
id: &mut Option<Id>,
response_type: ResponseType,
peer_id: &PeerId,
cx: &mut SyncNetworkContext<T>,
req: &mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,
log: &Logger,
) -> ShouldRemoveLookup {
id
.as_mut()
.filter(|_| req.check_peer_disconnected(peer_id, response_type)
.is_err())
.map(|block_id| {
trace!(log, "Single lookup failed on peer disconnection"; "block_root" => ?req.requested_block_root, "response_type" => ?response_type);
retry_request_after_failure(block_id, req, response_type, peer_id, cx, log)
})
.unwrap_or(ShouldRemoveLookup::False)
}
fn should_remove_failed_lookup<T: BeaconChainTypes>(
id: &mut Option<Id>,
response_type: ResponseType,
target_id: Id,
msg: &'static str,
peer_id: &PeerId,
cx: &mut SyncNetworkContext<T>,
req: &mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,
log: &Logger,
) -> ShouldRemoveLookup {
id
.as_mut()
.filter(|id| **id == target_id)
.map(|id| {
req.register_failure_downloading(response_type);
trace!(log, "Single lookup failed"; "block" => %req.requested_block_root, "error" => msg, "response_type" => ?response_type);
retry_request_after_failure(id, req, response_type, peer_id, cx, log)
})
.unwrap_or(ShouldRemoveLookup::False)
}
fn should_remove_missing_components<T: BeaconChainTypes>(
request_ref: &mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,
response_type: ResponseType,
blob_id_ref: &mut Option<Id>,
cx: &mut SyncNetworkContext<T>,
log: &Logger,
) -> ShouldRemoveLookup {
request_ref.set_component_processed(response_type);
// If we get a missing component response after processing both a blob and a block response, the
// blobs must be what are missing.
if request_ref.both_components_processed() {
let Ok(blob_peer) = request_ref.processing_peer(ResponseType::Blob) else {
return ShouldRemoveLookup::False;
};
if let PeerShouldHave::BlockAndBlobs(blob_peer) = blob_peer {
cx.report_peer(
blob_peer,
PeerAction::MidToleranceError,
"single_block_failure",
);
}
request_ref.remove_peer_if_useless(blob_peer.as_peer_id(), ResponseType::Blob);
if let Some(blob_id_ref) = blob_id_ref {
if !request_ref.awaiting_download(ResponseType::Blob) {
// Try it again if possible.
return retry_request_after_failure(
blob_id_ref,
request_ref,
ResponseType::Blob,
blob_peer.as_peer_id(),
cx,
log,
);
}
}
}
ShouldRemoveLookup::False
}

View File

@@ -8,14 +8,14 @@ use crate::sync::{
};
use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker};
use beacon_chain::data_availability_checker::DataAvailabilityChecker;
use beacon_chain::BeaconChainTypes;
use lighthouse_network::PeerId;
use std::sync::Arc;
use store::Hash256;
use strum::IntoStaticStr;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
use types::{BlobSidecar, SignedBeaconBlock};
/// How many attempts we try to find a parent of a block before we give up trying.
pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5;

View File

@@ -29,6 +29,44 @@ pub struct SingleBlockLookup<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> {
pub unknown_parent_components: Option<UnknownParentComponents<T::EthSpec>>,
}
impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS, T> {
pub(crate) fn register_failure_downloading(&mut self, response_type: ResponseType) {
match response_type {
ResponseType::Block => self.block_request_state.register_failure_downloading(),
ResponseType::Blob => self.blob_request_state.register_failure_downloading(),
}
}
}
impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS, T> {
pub(crate) fn awaiting_download(&mut self, response_type: ResponseType) -> bool {
match response_type {
ResponseType::Block => {
matches!(self.block_request_state.state, State::AwaitingDownload)
}
ResponseType::Blob => matches!(self.blob_request_state.state, State::AwaitingDownload),
}
}
pub(crate) fn remove_peer_if_useless(&mut self, peer_id: &PeerId, response_type: ResponseType) {
match response_type {
ResponseType::Block => self.block_request_state.remove_peer_if_useless(peer_id),
ResponseType::Blob => self.blob_request_state.remove_peer_if_useless(peer_id),
}
}
pub(crate) fn check_peer_disconnected(
&mut self,
peer_id: &PeerId,
response_type: ResponseType,
) -> Result<(), ()> {
match response_type {
ResponseType::Block => self.block_request_state.check_peer_disconnected(peer_id),
ResponseType::Blob => self.blob_request_state.check_peer_disconnected(peer_id),
}
}
}
#[derive(Default)]
pub struct UnknownParentComponents<E: EthSpec> {
pub downloaded_block: Option<Arc<SignedBeaconBlock<E>>>,
@@ -317,41 +355,17 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
self.block_request_state.state,
State::AwaitingDownload
));
if self.block_request_state.failed_attempts() >= MAX_ATTEMPTS {
let request = BlocksByRootRequest {
block_roots: VariableList::from(vec![self.requested_block_root]),
};
let response_type = ResponseType::Block;
if self.too_many_attempts(response_type) {
Err(LookupRequestError::TooManyAttempts {
cannot_process: self.block_request_state.failed_processing
>= self.block_request_state.failed_downloading,
cannot_process: self.cannot_process(response_type),
})
} else if let Some(&peer_id) = self
.block_request_state
.available_peers
.iter()
.choose(&mut rand::thread_rng())
{
let request = BlocksByRootRequest {
block_roots: VariableList::from(vec![self.requested_block_root]),
};
self.block_request_state.used_peers.insert(peer_id);
let peer_source = PeerShouldHave::BlockAndBlobs(peer_id);
self.block_request_state.state = State::Downloading {
peer_id: peer_source,
};
Ok(Some((peer_id, request)))
} else if let Some(&peer_id) = self
.block_request_state
.potential_peers
.iter()
.choose(&mut rand::thread_rng())
{
let request = BlocksByRootRequest {
block_roots: VariableList::from(vec![self.requested_block_root]),
};
self.block_request_state.used_peers.insert(peer_id);
let peer_source = PeerShouldHave::Neither(peer_id);
self.block_request_state.state = State::Downloading {
peer_id: peer_source,
};
Ok(Some((peer_id, request)))
} else if let Some(peer_id) = self.get_peer(response_type) {
self.add_used_peer(peer_id, response_type);
Ok(Some((peer_id.to_peer_id(), request)))
} else {
Err(LookupRequestError::NoPeers)
}
@@ -370,46 +384,92 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
self.blob_request_state.state,
State::AwaitingDownload
));
if self.blob_request_state.failed_attempts() >= MAX_ATTEMPTS {
let request = BlobsByRootRequest {
blob_ids: VariableList::from(self.requested_ids.clone()),
};
let response_type = ResponseType::Blob;
if self.too_many_attempts(response_type) {
Err(LookupRequestError::TooManyAttempts {
cannot_process: self.blob_request_state.failed_processing
>= self.blob_request_state.failed_downloading,
cannot_process: self.cannot_process(response_type),
})
} else if let Some(&peer_id) = self
.blob_request_state
.available_peers
.iter()
.choose(&mut rand::thread_rng())
{
let request = BlobsByRootRequest {
blob_ids: VariableList::from(self.requested_ids.clone()),
};
self.blob_request_state.used_peers.insert(peer_id);
let peer_source = PeerShouldHave::BlockAndBlobs(peer_id);
self.blob_request_state.state = State::Downloading {
peer_id: peer_source,
};
Ok(Some((peer_id, request)))
} else if let Some(&peer_id) = self
.blob_request_state
.potential_peers
.iter()
.choose(&mut rand::thread_rng())
{
let request = BlobsByRootRequest {
blob_ids: VariableList::from(self.requested_ids.clone()),
};
self.blob_request_state.used_peers.insert(peer_id);
let peer_source = PeerShouldHave::Neither(peer_id);
self.blob_request_state.state = State::Downloading {
peer_id: peer_source,
};
Ok(Some((peer_id, request)))
} else if let Some(peer_id) = self.get_peer(response_type) {
self.add_used_peer(peer_id, response_type);
Ok(Some((peer_id.to_peer_id(), request)))
} else {
Err(LookupRequestError::NoPeers)
}
}
fn too_many_attempts(&self, response_type: ResponseType) -> bool {
match response_type {
ResponseType::Block => self.block_request_state.failed_attempts() >= MAX_ATTEMPTS,
ResponseType::Blob => self.blob_request_state.failed_attempts() >= MAX_ATTEMPTS,
}
}
fn cannot_process(&self, response_type: ResponseType) -> bool {
match response_type {
ResponseType::Block => {
self.block_request_state.failed_processing
>= self.block_request_state.failed_downloading
}
ResponseType::Blob => {
self.blob_request_state.failed_processing
>= self.blob_request_state.failed_downloading
}
}
}
fn get_peer(&self, response_type: ResponseType) -> Option<PeerShouldHave> {
match response_type {
ResponseType::Block => self
.block_request_state
.available_peers
.iter()
.choose(&mut rand::thread_rng())
.copied()
.map(PeerShouldHave::BlockAndBlobs)
.or(self
.block_request_state
.potential_peers
.iter()
.choose(&mut rand::thread_rng())
.copied()
.map(PeerShouldHave::Neither)),
ResponseType::Blob => self
.blob_request_state
.available_peers
.iter()
.choose(&mut rand::thread_rng())
.copied()
.map(PeerShouldHave::BlockAndBlobs)
.or(self
.blob_request_state
.potential_peers
.iter()
.choose(&mut rand::thread_rng())
.copied()
.map(PeerShouldHave::Neither)),
}
}
fn add_used_peer(&mut self, peer_id: PeerShouldHave, response_type: ResponseType) {
match response_type {
ResponseType::Block => {
self.block_request_state
.used_peers
.insert(peer_id.to_peer_id());
self.block_request_state.state = State::Downloading { peer_id };
}
ResponseType::Blob => {
self.blob_request_state
.used_peers
.insert(peer_id.to_peer_id());
self.blob_request_state.state = State::Downloading { peer_id };
}
}
}
pub fn add_peer_if_useful(
&mut self,
block_root: &Hash256,
@@ -444,6 +504,17 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
ResponseType::Blob => self.blob_request_state.peer(),
}
}
pub fn both_components_processed(&self) -> bool {
self.block_request_state.component_processed && self.block_request_state.component_processed
}
pub fn set_component_processed(&mut self, response_type: ResponseType) {
match response_type {
ResponseType::Block => self.block_request_state.component_processed = true,
ResponseType::Blob => self.blob_request_state.component_processed = true,
}
}
}
impl<const MAX_ATTEMPTS: u8> SingleLookupRequestState<MAX_ATTEMPTS> {
@@ -585,12 +656,15 @@ mod tests {
use super::*;
use beacon_chain::builder::Witness;
use beacon_chain::eth1_chain::CachingEth1Backend;
use slog::Logger;
use sloggers::null::NullLoggerBuilder;
use sloggers::Build;
use slot_clock::{SlotClock, TestingSlotClock};
use std::time::Duration;
use store::MemoryStore;
use store::{HotColdDB, MemoryStore, StoreConfig};
use types::{
test_utils::{SeedableRng, TestRandom, XorShiftRng},
EthSpec, MinimalEthSpec as E, SignedBeaconBlock, Slot,
ChainSpec, EthSpec, MinimalEthSpec as E, SignedBeaconBlock, Slot,
};
fn rand_block() -> SignedBeaconBlock<E> {
@@ -614,7 +688,13 @@ mod tests {
Duration::from_secs(0),
Duration::from_secs(spec.seconds_per_slot),
);
let da_checker = Arc::new(DataAvailabilityChecker::new(slot_clock, None, spec));
let log = NullLoggerBuilder.build().expect("logger should build");
let store = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log)
.expect("store");
let da_checker = Arc::new(
DataAvailabilityChecker::new(slot_clock, None, store.into(), spec)
.expect("data availability checker"),
);
let mut sl =
SingleBlockLookup::<4, T>::new(block.canonical_root(), None, peer_id, da_checker);
sl.request_block().unwrap();
@@ -632,8 +712,14 @@ mod tests {
Duration::from_secs(0),
Duration::from_secs(spec.seconds_per_slot),
);
let log = NullLoggerBuilder.build().expect("logger should build");
let store = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log)
.expect("store");
let da_checker = Arc::new(DataAvailabilityChecker::new(slot_clock, None, spec));
let da_checker = Arc::new(
DataAvailabilityChecker::new(slot_clock, None, store.into(), spec)
.expect("data availability checker"),
);
let mut sl = SingleBlockLookup::<FAILURES, T>::new(
block.canonical_root(),

View File

@@ -1704,8 +1704,8 @@ mod deneb_only {
.block_response_triggering_process()
.invalid_block_processed()
.expect_penalty()
.expect_blobs_request()
.expect_block_request()
.expect_no_blobs_request()
.blobs_response()
.missing_components_from_blob_request()
.expect_no_penalty()
@@ -1726,7 +1726,7 @@ mod deneb_only {
.invalid_blob_processed()
.expect_penalty()
.expect_blobs_request()
.expect_no_block_request();
.expect_block_request();
}
#[test]
@@ -1881,8 +1881,8 @@ mod deneb_only {
.block_response_triggering_process()
.invalid_block_processed()
.expect_penalty()
.expect_blobs_request()
.expect_block_request()
.expect_no_blobs_request()
.blobs_response()
.missing_components_from_blob_request()
.expect_no_penalty()
@@ -1903,7 +1903,7 @@ mod deneb_only {
.invalid_blob_processed()
.expect_penalty()
.expect_blobs_request()
.expect_no_block_request();
.expect_block_request();
}
#[test]

View File

@@ -70,7 +70,7 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// Small enumeration to make dealing with block and blob requests easier.
pub enum BlockOrBlob<T: EthSpec> {
Block(Option<Arc<SignedBeaconBlock<T>>>),
Sidecar(Option<Arc<BlobSidecar<T>>>),
Blob(Option<Arc<BlobSidecar<T>>>),
}
impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlob<T> {
@@ -81,7 +81,7 @@ impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlob<T> {
impl<T: EthSpec> From<Option<Arc<BlobSidecar<T>>>> for BlockOrBlob<T> {
fn from(blob: Option<Arc<BlobSidecar<T>>>) -> Self {
BlockOrBlob::Sidecar(blob)
BlockOrBlob::Blob(blob)
}
}
@@ -312,7 +312,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let info = &mut req.block_blob_info;
match block_or_blob {
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlob::Sidecar(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
}
if info.is_finished() {
// If the request is finished, dequeue everything
@@ -389,7 +389,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let (_, info) = entry.get_mut();
match block_or_blob {
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlob::Sidecar(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
}
if info.is_finished() {
// If the request is finished, dequeue everything