Merge parent and current sync lookups (#5655)

* Drop lookup type trait for a simple arg

* Drop reconstructed for processing

* Send parent blocks one by one

* Merge current and parent lookups

* Merge current and parent lookups clean up todos

* Merge current and parent lookups tests

* Merge remote-tracking branch 'origin/unstable' into sync-merged-lookup

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into sync-merged-lookup

* fix compile after merge

* #5655 pr review (#26)

* fix compile after merge

* remove todos, fix typos etc

* fix compile

* stable rng

* delete TODO and unfilled out test

* make download result a struct

* enums instead of bools as params

* fix comment

* Various fixes

* Track ignored child components

* Track dropped lookup reason as metric

* fix test

* add comment describing behavior of avail check error

*  update ordering
This commit is contained in:
Lion - dapplion
2024-05-01 05:12:15 +09:00
committed by GitHub
parent 196d9fd110
commit ce66582c16
17 changed files with 1633 additions and 2503 deletions

View File

@@ -1,21 +1,21 @@
use crate::sync::block_lookups::parent_lookup::PARENT_FAIL_TOLERANCE;
use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{
BlobRequestState, BlockLookups, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
BlobRequestState, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
};
use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId};
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::{
BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest, SyncNetworkContext,
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_availability_checker::ChildComponents;
use beacon_chain::BeaconChainTypes;
use std::sync::Arc;
use std::time::Duration;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{Hash256, SignedBeaconBlock};
use types::SignedBeaconBlock;
use super::single_block_lookup::DownloadResult;
use super::SingleLookupId;
#[derive(Debug, Copy, Clone)]
pub enum ResponseType {
@@ -23,20 +23,15 @@ pub enum ResponseType {
Blob,
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum LookupType {
Current,
Parent,
}
/// The maximum depth we will search for a parent block. In principle we should have sync'd any
/// canonical chain to its head once the peer connects. A chain should not appear where it's depth
/// is further back than the most recent head slot.
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;
impl LookupType {
fn max_attempts(&self) -> u8 {
match self {
LookupType::Current => SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
LookupType::Parent => PARENT_FAIL_TOLERANCE,
}
}
}
/// Wrapper around bool to prevent mixing this argument with `BlockIsProcessed`
pub(crate) struct AwaitingParent(pub bool);
/// Wrapper around bool to prevent mixing this argument with `AwaitingParent`
pub(crate) struct BlockIsProcessed(pub bool);
/// This trait unifies common single block lookup functionality across blocks and blobs. This
/// includes making requests, verifying responses, and handling processing results. A
@@ -53,115 +48,68 @@ pub trait RequestState<T: BeaconChainTypes> {
/// The type created after validation.
type VerifiedResponseType: Clone;
/// We convert a `VerifiedResponseType` to this type prior to sending it to the beacon processor.
type ReconstructedResponseType;
/* Request building methods */
/// Construct a new request.
fn build_request(
&mut self,
lookup_type: LookupType,
) -> Result<(PeerId, Self::RequestType), LookupRequestError> {
// Verify and construct request.
self.too_many_attempts(lookup_type)?;
let peer = self.get_peer()?;
let request = self.new_request();
Ok((peer, request))
}
/// Construct a new request and send it.
fn build_request_and_send(
/// Potentially makes progress on this request if it's in a progress-able state
fn continue_request(
&mut self,
id: Id,
lookup_type: LookupType,
awaiting_parent: AwaitingParent,
downloaded_block_expected_blobs: Option<usize>,
block_is_processed: BlockIsProcessed,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
// Check if request is necessary.
if !self.get_state().is_awaiting_download() {
return Ok(());
// Attempt to progress awaiting downloads
if self.get_state().is_awaiting_download() {
// Verify the current request has not exceeded the maximum number of attempts.
let request_state = self.get_state();
if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS {
let cannot_process = request_state.more_failed_processing_attempts();
return Err(LookupRequestError::TooManyAttempts { cannot_process });
}
let peer_id = self
.get_state_mut()
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?;
// make_request returns true only if a request was made
if self.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
self.get_state_mut().on_download_start()?;
}
// Otherwise, attempt to progress awaiting processing
// If this request is awaiting a parent lookup to be processed, do not send for processing.
// The request will be rejected with unknown parent error.
} else if !awaiting_parent.0
&& (block_is_processed.0 || matches!(Self::response_type(), ResponseType::Block))
{
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
// useful to conditionally access the result data.
if let Some(result) = self.get_state_mut().maybe_start_processing() {
return Self::send_for_processing(id, result, cx);
}
}
// Construct request.
let (peer_id, request) = self.build_request(lookup_type)?;
// Update request state.
let req_counter = self.get_state_mut().on_download_start(peer_id);
// Make request
let id = SingleLookupReqId {
id,
req_counter,
lookup_type,
};
Self::make_request(id, peer_id, request, cx)
Ok(())
}
/// Verify the current request has not exceeded the maximum number of attempts.
fn too_many_attempts(&self, lookup_type: LookupType) -> Result<(), LookupRequestError> {
let request_state = self.get_state();
if request_state.failed_attempts() >= lookup_type.max_attempts() {
let cannot_process = request_state.more_failed_processing_attempts();
Err(LookupRequestError::TooManyAttempts { cannot_process })
} else {
Ok(())
}
}
/// Get the next peer to request. Draws from the set of peers we think should have both the
/// block and blob first. If that fails, we draw from the set of peers that may have either.
fn get_peer(&mut self) -> Result<PeerId, LookupRequestError> {
self.get_state_mut()
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)
}
/// Initialize `Self::RequestType`.
fn new_request(&self) -> Self::RequestType;
/// Send the request to the network service.
fn make_request(
id: SingleLookupReqId,
&self,
id: Id,
peer_id: PeerId,
request: Self::RequestType,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError>;
) -> Result<bool, LookupRequestError>;
/* Response handling methods */
/// A getter for the parent root of the response. Returns an `Option` because we won't know
/// the blob parent if we don't end up getting any blobs in the response.
fn get_parent_root(verified_response: &Self::VerifiedResponseType) -> Option<Hash256>;
/// Caches the verified response in the lookup if necessary. This is only necessary for lookups
/// triggered by `UnknownParent` errors.
fn add_to_child_components(
verified_response: Self::VerifiedResponseType,
components: &mut ChildComponents<T::EthSpec>,
);
/// Convert a verified response to the type we send to the beacon processor.
fn verified_to_reconstructed(
block_root: Hash256,
verified: Self::VerifiedResponseType,
) -> Self::ReconstructedResponseType;
/// Send the response to the beacon processor.
fn send_reconstructed_for_processing(
fn send_for_processing(
id: Id,
bl: &BlockLookups<T>,
block_root: Hash256,
verified: Self::ReconstructedResponseType,
duration: Duration,
result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError>;
/// Register a failure to process the block or blob.
fn register_failure_downloading(&mut self) {
self.get_state_mut().on_download_failure()
}
/* Utility methods */
/// Returns the `ResponseType` associated with this trait implementation. Useful in logging.
@@ -171,64 +119,49 @@ pub trait RequestState<T: BeaconChainTypes> {
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self;
/// A getter for a reference to the `SingleLookupRequestState` associated with this trait.
fn get_state(&self) -> &SingleLookupRequestState;
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType>;
/// A getter for a mutable reference to the SingleLookupRequestState associated with this trait.
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState;
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType>;
}
impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState {
impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
type RequestType = BlocksByRootSingleRequest;
type VerifiedResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;
type ReconstructedResponseType = RpcBlock<T::EthSpec>;
fn new_request(&self) -> Self::RequestType {
BlocksByRootSingleRequest(self.requested_block_root)
}
fn make_request(
id: SingleLookupReqId,
&self,
id: SingleLookupId,
peer_id: PeerId,
request: Self::RequestType,
_: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
cx.block_lookup_request(id, peer_id, request)
.map_err(LookupRequestError::SendFailed)
) -> Result<bool, LookupRequestError> {
cx.block_lookup_request(
id,
peer_id,
BlocksByRootSingleRequest(self.requested_block_root),
)
.map_err(LookupRequestError::SendFailed)
}
fn get_parent_root(verified_response: &Arc<SignedBeaconBlock<T::EthSpec>>) -> Option<Hash256> {
Some(verified_response.parent_root())
}
fn add_to_child_components(
verified_response: Arc<SignedBeaconBlock<T::EthSpec>>,
components: &mut ChildComponents<T::EthSpec>,
) {
components.merge_block(verified_response);
}
fn verified_to_reconstructed(
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
) -> RpcBlock<T::EthSpec> {
RpcBlock::new_without_blobs(Some(block_root), block)
}
fn send_reconstructed_for_processing(
id: Id,
bl: &BlockLookups<T>,
block_root: Hash256,
constructed: RpcBlock<T::EthSpec>,
duration: Duration,
fn send_for_processing(
id: SingleLookupId,
download_result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
bl.send_block_for_processing(
let DownloadResult {
value,
block_root,
constructed,
duration,
seen_timestamp,
peer_id: _,
} = download_result;
cx.send_block_for_processing(
block_root,
RpcBlock::new_without_blobs(Some(block_root), value),
seen_timestamp,
BlockProcessType::SingleBlock { id },
cx,
)
.map_err(LookupRequestError::SendFailed)
}
fn response_type() -> ResponseType {
@@ -237,73 +170,52 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState {
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.block_request_state
}
fn get_state(&self) -> &SingleLookupRequestState {
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
}
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState {
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType> {
&mut self.state
}
}
impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState {
impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
type RequestType = BlobsByRootSingleBlockRequest;
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;
type ReconstructedResponseType = FixedBlobSidecarList<T::EthSpec>;
fn new_request(&self) -> Self::RequestType {
BlobsByRootSingleBlockRequest {
block_root: self.block_root,
indices: self.requested_ids.indices(),
}
}
fn make_request(
id: SingleLookupReqId,
peer_id: PeerId,
request: Self::RequestType,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
cx.blob_lookup_request(id, peer_id, request)
.map_err(LookupRequestError::SendFailed)
}
fn get_parent_root(verified_response: &FixedBlobSidecarList<T::EthSpec>) -> Option<Hash256> {
verified_response
.into_iter()
.filter_map(|blob| blob.as_ref())
.map(|blob| blob.block_parent_root())
.next()
}
fn add_to_child_components(
verified_response: FixedBlobSidecarList<T::EthSpec>,
components: &mut ChildComponents<T::EthSpec>,
) {
components.merge_blobs(verified_response);
}
fn verified_to_reconstructed(
_block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> FixedBlobSidecarList<T::EthSpec> {
blobs
}
fn send_reconstructed_for_processing(
&self,
id: Id,
bl: &BlockLookups<T>,
block_root: Hash256,
verified: FixedBlobSidecarList<T::EthSpec>,
duration: Duration,
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> {
cx.blob_lookup_request(
id,
peer_id,
self.block_root,
downloaded_block_expected_blobs,
)
.map_err(LookupRequestError::SendFailed)
}
fn send_for_processing(
id: Id,
download_result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
bl.send_blobs_for_processing(
let DownloadResult {
value,
block_root,
verified,
duration,
seen_timestamp,
peer_id: _,
} = download_result;
cx.send_blobs_for_processing(
block_root,
value,
seen_timestamp,
BlockProcessType::SingleBlob { id },
cx,
)
.map_err(LookupRequestError::SendFailed)
}
fn response_type() -> ResponseType {
@@ -312,10 +224,10 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState {
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.blob_request_state
}
fn get_state(&self) -> &SingleLookupRequestState {
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
}
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState {
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType> {
&mut self.state
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,198 @@
use super::single_block_lookup::SingleBlockLookup;
use beacon_chain::BeaconChainTypes;
use std::collections::{HashMap, HashSet};
use types::Hash256;
/// Summary of a lookup of which we may not know it's parent_root yet
pub(crate) struct Node {
block_root: Hash256,
parent_root: Option<Hash256>,
}
impl<T: BeaconChainTypes> From<&SingleBlockLookup<T>> for Node {
fn from(value: &SingleBlockLookup<T>) -> Self {
Self {
block_root: value.block_root(),
parent_root: value.awaiting_parent(),
}
}
}
/// Wrapper around a chain of block roots that have a least one element (tip)
pub(crate) struct NodeChain {
// Parent chain blocks in descending slot order
pub(crate) chain: Vec<Hash256>,
pub(crate) tip: Hash256,
}
impl NodeChain {
/// Returns the block_root of the oldest ancestor (min slot) of this chain
pub(crate) fn ancestor(&self) -> Hash256 {
self.chain.last().copied().unwrap_or(self.tip)
}
pub(crate) fn len(&self) -> usize {
self.chain.len()
}
}
/// Given a set of nodes that reference each other, returns a list of chains with unique tips that
/// contain at least two elements. In descending slot order (tip first).
pub(crate) fn compute_parent_chains(nodes: &[Node]) -> Vec<NodeChain> {
let mut child_to_parent = HashMap::new();
let mut parent_to_child = HashMap::<Hash256, Vec<Hash256>>::new();
for node in nodes {
child_to_parent.insert(node.block_root, node.parent_root);
if let Some(parent_root) = node.parent_root {
parent_to_child
.entry(parent_root)
.or_default()
.push(node.block_root);
}
}
let mut parent_chains = vec![];
// Iterate blocks with no children
for tip in nodes {
let mut block_root = tip.block_root;
if parent_to_child.get(&block_root).is_none() {
let mut chain = vec![];
// Resolve chain of blocks
while let Some(parent_root) = child_to_parent.get(&block_root) {
// block_root is a known block that may or may not have a parent root
chain.push(block_root);
if let Some(parent_root) = parent_root {
block_root = *parent_root;
} else {
break;
}
}
if chain.len() > 1 {
parent_chains.push(NodeChain {
chain,
tip: tip.block_root,
});
}
}
}
parent_chains
}
/// Given a list of node chains, find the oldest node of a specific chain that is not contained in
/// any other chain.
pub(crate) fn find_oldest_fork_ancestor(
parent_chains: Vec<NodeChain>,
chain_idx: usize,
) -> Result<Hash256, &'static str> {
let mut other_blocks = HashSet::new();
// Register blocks from other chains
for (i, parent_chain) in parent_chains.iter().enumerate() {
if i != chain_idx {
for block in &parent_chain.chain {
other_blocks.insert(block);
}
}
}
// Should never happen
let parent_chain = parent_chains
.get(chain_idx)
.ok_or("chain_idx out of bounds")?;
// Find the first block in the target parent chain that is not in other parent chains
// Iterate in ascending slot order
for block in parent_chain.chain.iter().rev() {
if !other_blocks.contains(block) {
return Ok(*block);
}
}
// No match means that the chain is fully contained within another chain. This should never
// happen, but if that was the case just return the tip
Ok(parent_chain.tip)
}
#[cfg(test)]
mod tests {
use super::{compute_parent_chains, find_oldest_fork_ancestor, Node};
use types::Hash256;
fn h(n: u64) -> Hash256 {
Hash256::from_low_u64_be(n)
}
fn n(block: u64) -> Node {
Node {
block_root: h(block),
parent_root: None,
}
}
fn np(parent: u64, block: u64) -> Node {
Node {
block_root: h(block),
parent_root: Some(h(parent)),
}
}
fn compute_parent_chains_test(nodes: &[Node], expected_chain: Vec<Vec<Hash256>>) {
assert_eq!(
compute_parent_chains(nodes)
.iter()
.map(|c| c.chain.clone())
.collect::<Vec<_>>(),
expected_chain
);
}
fn find_oldest_fork_ancestor_test(nodes: &[Node], expected: Hash256) {
let chains = compute_parent_chains(nodes);
println!(
"chains {:?}",
chains.iter().map(|c| &c.chain).collect::<Vec<_>>()
);
assert_eq!(find_oldest_fork_ancestor(chains, 0).unwrap(), expected);
}
#[test]
fn compute_parent_chains_empty_case() {
compute_parent_chains_test(&[], vec![]);
}
#[test]
fn compute_parent_chains_single_branch() {
compute_parent_chains_test(&[n(0), np(0, 1), np(1, 2)], vec![vec![h(2), h(1), h(0)]]);
}
#[test]
fn compute_parent_chains_single_branch_with_solo() {
compute_parent_chains_test(
&[n(0), np(0, 1), np(1, 2), np(3, 4)],
vec![vec![h(2), h(1), h(0)]],
);
}
#[test]
fn compute_parent_chains_two_forking_branches() {
compute_parent_chains_test(
&[n(0), np(0, 1), np(1, 2), np(1, 3)],
vec![vec![h(2), h(1), h(0)], vec![h(3), h(1), h(0)]],
);
}
#[test]
fn compute_parent_chains_two_independent_branches() {
compute_parent_chains_test(
&[n(0), np(0, 1), np(1, 2), n(3), np(3, 4)],
vec![vec![h(2), h(1), h(0)], vec![h(4), h(3)]],
);
}
#[test]
fn find_oldest_fork_ancestor_simple_case() {
find_oldest_fork_ancestor_test(&[n(0), np(0, 1), np(1, 2), np(0, 3)], h(1))
}
}

View File

@@ -1,227 +0,0 @@
use super::common::LookupType;
use super::single_block_lookup::{LookupRequestError, SingleBlockLookup};
use super::{DownloadedBlock, PeerId};
use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_availability_checker::{ChildComponents, DataAvailabilityChecker};
use beacon_chain::BeaconChainTypes;
use std::collections::VecDeque;
use std::sync::Arc;
use store::Hash256;
/// How many attempts we try to find a parent of a block before we give up trying.
pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5;
/// The maximum depth we will search for a parent block. In principle we should have sync'd any
/// canonical chain to its head once the peer connects. A chain should not appear where it's depth
/// is further back than the most recent head slot.
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;
/// Maintains a sequential list of parents to lookup and the lookup's current state.
pub(crate) struct ParentLookup<T: BeaconChainTypes> {
/// The root of the block triggering this parent request.
chain_hash: Hash256,
/// The blocks that have currently been downloaded.
downloaded_blocks: Vec<DownloadedBlock<T::EthSpec>>,
/// Request of the last parent.
pub current_parent_request: SingleBlockLookup<T>,
}
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum RequestError {
SendFailed(&'static str),
ChainTooLong,
/// We witnessed too many failures trying to complete this parent lookup.
TooManyAttempts {
/// We received more failures trying to process the blocks than downloading them
/// from peers.
cannot_process: bool,
},
NoPeers,
BadState(String),
}
impl<T: BeaconChainTypes> ParentLookup<T> {
pub fn new(
block_root: Hash256,
parent_root: Hash256,
peer_id: PeerId,
da_checker: Arc<DataAvailabilityChecker<T>>,
cx: &mut SyncNetworkContext<T>,
) -> Self {
let current_parent_request = SingleBlockLookup::new(
parent_root,
Some(ChildComponents::empty(block_root)),
&[peer_id],
da_checker,
cx.next_id(),
LookupType::Parent,
);
Self {
chain_hash: block_root,
downloaded_blocks: vec![],
current_parent_request,
}
}
pub fn contains_block(&self, block_root: &Hash256) -> bool {
self.downloaded_blocks
.iter()
.any(|(root, _d_block)| root == block_root)
}
pub fn is_for_block(&self, block_root: Hash256) -> bool {
self.current_parent_request.is_for_block(block_root)
}
/// Attempts to request the next unknown parent. If the request fails, it should be removed.
pub fn request_parent(&mut self, cx: &mut SyncNetworkContext<T>) -> Result<(), RequestError> {
// check to make sure this request hasn't failed
if self.downloaded_blocks.len() + 1 >= PARENT_DEPTH_TOLERANCE {
return Err(RequestError::ChainTooLong);
}
self.current_parent_request
.request_block_and_blobs(cx)
.map_err(Into::into)
}
pub fn check_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> {
self.current_parent_request
.block_request_state
.state
.check_peer_disconnected(peer_id)
.and_then(|()| {
self.current_parent_request
.blob_request_state
.state
.check_peer_disconnected(peer_id)
})
}
pub fn add_unknown_parent_block(&mut self, block: RpcBlock<T::EthSpec>) {
let next_parent = block.parent_root();
// Cache the block.
let current_root = self.current_parent_request.block_root();
self.downloaded_blocks.push((current_root, block));
// Update the parent request.
self.current_parent_request
.update_requested_parent_block(next_parent)
}
pub fn block_processing_peer(&self) -> Result<PeerId, String> {
self.current_parent_request
.block_request_state
.state
.processing_peer()
}
pub fn blob_processing_peer(&self) -> Result<PeerId, String> {
self.current_parent_request
.blob_request_state
.state
.processing_peer()
}
/// Consumes the parent request and destructures it into it's parts.
#[allow(clippy::type_complexity)]
pub fn parts_for_processing(
self,
) -> (
Hash256,
VecDeque<RpcBlock<T::EthSpec>>,
Vec<Hash256>,
SingleBlockLookup<T>,
) {
let ParentLookup {
chain_hash,
downloaded_blocks,
current_parent_request,
} = self;
let block_count = downloaded_blocks.len();
let mut blocks = VecDeque::with_capacity(block_count);
let mut hashes = Vec::with_capacity(block_count);
for (hash, block) in downloaded_blocks.into_iter() {
blocks.push_back(block);
hashes.push(hash);
}
(chain_hash, blocks, hashes, current_parent_request)
}
/// Get the parent lookup's chain hash.
pub fn chain_hash(&self) -> Hash256 {
self.chain_hash
}
pub fn processing_failed(&mut self) {
self.current_parent_request
.block_request_state
.state
.on_processing_failure();
self.current_parent_request
.blob_request_state
.state
.on_processing_failure();
if let Some(components) = self.current_parent_request.child_components.as_mut() {
components.downloaded_block = None;
components.downloaded_blobs = <_>::default();
}
}
pub fn add_peer(&mut self, peer: PeerId) {
self.current_parent_request.add_peer(peer)
}
/// Adds a list of peers to the parent request.
pub fn add_peers(&mut self, peers: &[PeerId]) {
self.current_parent_request.add_peers(peers)
}
pub fn all_used_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.current_parent_request.all_used_peers()
}
}
impl From<LookupRequestError> for RequestError {
fn from(e: LookupRequestError) -> Self {
use LookupRequestError as E;
match e {
E::TooManyAttempts { cannot_process } => {
RequestError::TooManyAttempts { cannot_process }
}
E::NoPeers => RequestError::NoPeers,
E::SendFailed(msg) => RequestError::SendFailed(msg),
E::BadState(msg) => RequestError::BadState(msg),
}
}
}
impl<T: BeaconChainTypes> slog::KV for ParentLookup<T> {
fn serialize(
&self,
record: &slog::Record,
serializer: &mut dyn slog::Serializer,
) -> slog::Result {
serializer.emit_arguments("chain_hash", &format_args!("{}", self.chain_hash))?;
slog::Value::serialize(&self.current_parent_request, record, "parent", serializer)?;
serializer.emit_usize("downloaded_blocks", self.downloaded_blocks.len())?;
slog::Result::Ok(())
}
}
impl RequestError {
pub fn as_static(&self) -> &'static str {
match self {
RequestError::SendFailed(e) => e,
RequestError::ChainTooLong => "chain_too_long",
RequestError::TooManyAttempts { cannot_process } if *cannot_process => {
"too_many_processing_attempts"
}
RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts",
RequestError::NoPeers => "no_peers",
RequestError::BadState(..) => "bad_state",
}
}
}

View File

@@ -1,24 +1,19 @@
use super::common::LookupType;
use super::PeerId;
use super::common::{AwaitingParent, BlockIsProcessed};
use super::{BlockComponent, PeerId};
use crate::sync::block_lookups::common::RequestState;
use crate::sync::block_lookups::Id;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_availability_checker::ChildComponents;
use beacon_chain::data_availability_checker::{
AvailabilityCheckError, DataAvailabilityChecker, MissingBlobs,
};
use beacon_chain::BeaconChainTypes;
use itertools::Itertools;
use lighthouse_network::PeerAction;
use rand::seq::IteratorRandom;
use slog::{debug, Logger};
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use store::Hash256;
use strum::IntoStaticStr;
use types::EthSpec;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{EthSpec, SignedBeaconBlock};
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum LookupRequestError {
@@ -34,38 +29,64 @@ pub enum LookupRequestError {
pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id,
pub lookup_type: LookupType,
pub block_request_state: BlockRequestState,
pub blob_request_state: BlobRequestState,
pub da_checker: Arc<DataAvailabilityChecker<T>>,
/// Only necessary for requests triggered by an `UnknownBlockParent` or `UnknownBlockParent`
/// because any blocks or blobs without parents won't hit the data availability cache.
pub child_components: Option<ChildComponents<T::EthSpec>>,
pub block_request_state: BlockRequestState<T::EthSpec>,
pub blob_request_state: BlobRequestState<T::EthSpec>,
block_root: Hash256,
awaiting_parent: Option<Hash256>,
}
impl<T: BeaconChainTypes> SingleBlockLookup<T> {
pub fn new(
requested_block_root: Hash256,
child_components: Option<ChildComponents<T::EthSpec>>,
peers: &[PeerId],
da_checker: Arc<DataAvailabilityChecker<T>>,
id: Id,
lookup_type: LookupType,
awaiting_parent: Option<Hash256>,
) -> Self {
let is_deneb = da_checker.is_deneb();
Self {
id,
lookup_type,
block_request_state: BlockRequestState::new(requested_block_root, peers),
blob_request_state: BlobRequestState::new(requested_block_root, peers, is_deneb),
da_checker,
child_components,
blob_request_state: BlobRequestState::new(requested_block_root, peers),
block_root: requested_block_root,
awaiting_parent,
}
}
/// Get the block root that is being requested.
pub fn block_root(&self) -> Hash256 {
self.block_request_state.requested_block_root
self.block_root
}
pub fn awaiting_parent(&self) -> Option<Hash256> {
self.awaiting_parent
}
/// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send
/// components for processing.
pub fn set_awaiting_parent(&mut self, parent_root: Hash256) {
self.awaiting_parent = Some(parent_root)
}
/// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for
/// processing.
pub fn resolve_awaiting_parent(&mut self) {
self.awaiting_parent = None;
}
/// Maybe insert a verified response into this lookup. Returns true if imported
pub fn add_child_components(&mut self, block_component: BlockComponent<T::EthSpec>) -> bool {
match block_component {
BlockComponent::Block(block) => self
.block_request_state
.state
.insert_verified_response(block),
BlockComponent::Blob(_) => {
// For now ignore single blobs, as the blob request state assumes all blobs are
// attributed to the same peer = the peer serving the remaining blobs. Ignoring this
// block component has a minor effect, causing the node to re-request this blob
// once the parent chain is successfully resolved
false
}
}
}
/// Check the block root matches the requested block root.
@@ -73,16 +94,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.block_root() == block_root
}
/// Update the requested block, this should only be used in a chain of parent lookups to request
/// the next parent.
pub fn update_requested_parent_block(&mut self, block_root: Hash256) {
self.block_request_state.requested_block_root = block_root;
self.blob_request_state.block_root = block_root;
self.block_request_state.state.state = State::AwaitingDownload;
self.blob_request_state.state.state = State::AwaitingDownload;
self.child_components = Some(ChildComponents::empty(block_root));
}
/// Get all unique used peers across block and blob requests.
pub fn all_used_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.block_request_state
@@ -92,87 +103,44 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
.unique()
}
/// Send the necessary requests for blocks and/or blobs. This will check whether we have
/// downloaded the block and/or blobs already and will not send requests if so. It will also
/// inspect the request state or blocks and blobs to ensure we are not already processing or
/// downloading the block and/or blobs.
pub fn request_block_and_blobs(
/// Get all unique available peers across block and blob requests.
pub fn all_available_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.block_request_state
.state
.get_available_peers()
.chain(self.blob_request_state.state.get_available_peers())
.unique()
}
pub fn continue_requests(
&mut self,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
let block_already_downloaded = self.block_already_downloaded();
let blobs_already_downloaded = self.blobs_already_downloaded();
if !block_already_downloaded {
self.block_request_state
.build_request_and_send(self.id, self.lookup_type, cx)?;
}
if !blobs_already_downloaded {
self.blob_request_state
.build_request_and_send(self.id, self.lookup_type, cx)?;
}
// TODO: Check what's necessary to download, specially for blobs
self.continue_request::<BlockRequestState<T::EthSpec>>(cx)?;
self.continue_request::<BlobRequestState<T::EthSpec>>(cx)?;
Ok(())
}
/// Returns a `CachedChild`, which is a wrapper around a `RpcBlock` that is either:
///
/// 1. `NotRequired`: there is no child caching required for this lookup.
/// 2. `DownloadIncomplete`: Child caching is required, but all components are not yet downloaded.
/// 3. `Ok`: The child is required and we have downloaded it.
/// 4. `Err`: The child is required, but has failed consistency checks.
pub fn get_cached_child_block(&self) -> CachedChild<T::EthSpec> {
if let Some(components) = self.child_components.as_ref() {
let Some(block) = components.downloaded_block.as_ref() else {
return CachedChild::DownloadIncomplete;
};
if !self.missing_blob_ids().is_empty() {
return CachedChild::DownloadIncomplete;
}
match RpcBlock::new_from_fixed(
self.block_request_state.requested_block_root,
block.clone(),
components.downloaded_blobs.clone(),
) {
Ok(rpc_block) => CachedChild::Ok(rpc_block),
Err(e) => CachedChild::Err(e),
}
} else {
CachedChild::NotRequired
}
}
/// Accepts a verified response, and adds it to the child components if required. This method
/// returns a `CachedChild` which provides a completed block + blob response if all components have been
/// received, or information about whether the child is required and if it has been downloaded.
pub fn add_response<R: RequestState<T>>(
pub fn continue_request<R: RequestState<T>>(
&mut self,
verified_response: R::VerifiedResponseType,
) -> CachedChild<T::EthSpec> {
if let Some(child_components) = self.child_components.as_mut() {
R::add_to_child_components(verified_response, child_components);
self.get_cached_child_block()
} else {
CachedChild::NotRequired
}
}
/// Add a child component to the lookup request. Merges with any existing child components.
pub fn add_child_components(&mut self, components: ChildComponents<T::EthSpec>) {
if let Some(ref mut existing_components) = self.child_components {
let ChildComponents {
block_root: _,
downloaded_block,
downloaded_blobs,
} = components;
if let Some(block) = downloaded_block {
existing_components.merge_block(block);
}
existing_components.merge_blobs(downloaded_blobs);
} else {
self.child_components = Some(components);
}
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
let id = self.id;
let awaiting_parent = self.awaiting_parent.is_some();
let downloaded_block_expected_blobs = self
.block_request_state
.state
.peek_downloaded_data()
.map(|block| block.num_expected_blobs());
let block_is_processed = self.block_request_state.state.is_processed();
R::request_state_mut(self).continue_request(
id,
AwaitingParent(awaiting_parent),
downloaded_block_expected_blobs,
BlockIsProcessed(block_is_processed),
cx,
)
}
/// Add all given peers to both block and blob request states.
@@ -188,12 +156,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
/// Returns true if the block has already been downloaded.
pub fn both_components_downloaded(&self) -> bool {
self.block_request_state.state.is_downloaded()
&& self.blob_request_state.state.is_downloaded()
}
/// Returns true if the block has already been downloaded.
pub fn both_components_processed(&self) -> bool {
self.block_request_state.state.is_processed()
@@ -203,133 +165,43 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Checks both the block and blob request states to see if the peer is disconnected.
///
/// Returns true if the lookup should be dropped.
pub fn should_drop_lookup_on_disconnected_peer(
&mut self,
peer_id: &PeerId,
cx: &mut SyncNetworkContext<T>,
log: &Logger,
) -> bool {
let block_root = self.block_root();
let block_peer_disconnected = self
.block_request_state
.state
.check_peer_disconnected(peer_id)
.is_err();
let blob_peer_disconnected = self
.blob_request_state
.state
.check_peer_disconnected(peer_id)
.is_err();
pub fn should_drop_lookup_on_disconnected_peer(&mut self, peer_id: &PeerId) -> bool {
self.block_request_state.state.remove_peer(peer_id);
self.blob_request_state.state.remove_peer(peer_id);
if block_peer_disconnected || blob_peer_disconnected {
if let Err(e) = self.request_block_and_blobs(cx) {
debug!(log, "Single lookup failed on peer disconnection"; "block_root" => ?block_root, "error" => ?e);
return true;
}
if self.all_available_peers().count() == 0 {
return true;
}
// Note: if the peer disconnected happens to have an on-going request associated with this
// lookup we will receive an RPCError and the lookup will fail. No need to manually retry
// now.
false
}
/// Returns `true` if the block has already been downloaded.
pub(crate) fn block_already_downloaded(&self) -> bool {
if let Some(components) = self.child_components.as_ref() {
components.downloaded_block.is_some()
} else {
self.da_checker.has_block(&self.block_root())
}
}
/// Updates the `requested_ids` field of the `BlockRequestState` with the most recent picture
/// of which blobs still need to be requested. Returns `true` if there are no more blobs to
/// request.
pub(crate) fn blobs_already_downloaded(&mut self) -> bool {
if matches!(self.blob_request_state.state.state, State::AwaitingDownload) {
self.update_blobs_request();
}
self.blob_request_state.requested_ids.is_empty()
}
/// Updates this request with the most recent picture of which blobs still need to be requested.
pub fn update_blobs_request(&mut self) {
self.blob_request_state.requested_ids = self.missing_blob_ids();
}
/// If `child_components` is `Some`, we know block components won't hit the data
/// availability cache, so we don't check its processing cache unless `child_components`
/// is `None`.
pub(crate) fn missing_blob_ids(&self) -> MissingBlobs {
let block_root = self.block_root();
if let Some(components) = self.child_components.as_ref() {
self.da_checker.get_missing_blob_ids(
block_root,
components.downloaded_block.as_ref().map(|b| b.as_ref()),
&components.downloaded_blobs,
)
} else {
self.da_checker.get_missing_blob_ids_with(block_root)
}
}
/// Penalizes a blob peer if it should have blobs but didn't return them to us.
pub fn penalize_blob_peer(&mut self, cx: &SyncNetworkContext<T>) {
if let Ok(blob_peer) = self.blob_request_state.state.processing_peer() {
cx.report_peer(
blob_peer,
PeerAction::MidToleranceError,
"single_blob_failure",
);
}
}
/// This failure occurs on download, so register a failure downloading, penalize the peer
/// and clear the blob cache.
pub fn handle_consistency_failure(&mut self, cx: &SyncNetworkContext<T>) {
self.penalize_blob_peer(cx);
if let Some(cached_child) = self.child_components.as_mut() {
cached_child.clear_blobs();
}
self.blob_request_state.state.on_download_failure()
}
/// This failure occurs after processing, so register a failure processing, penalize the peer
/// and clear the blob cache.
pub fn handle_availability_check_failure(&mut self, cx: &SyncNetworkContext<T>) {
self.penalize_blob_peer(cx);
if let Some(cached_child) = self.child_components.as_mut() {
cached_child.clear_blobs();
}
self.blob_request_state.state.on_processing_failure()
}
}
/// The state of the blob request component of a `SingleBlockLookup`.
pub struct BlobRequestState {
/// The latest picture of which blobs still need to be requested. This includes information
/// from both block/blobs downloaded in the network layer and any blocks/blobs that exist in
/// the data availability checker.
pub requested_ids: MissingBlobs,
pub struct BlobRequestState<E: EthSpec> {
pub block_root: Hash256,
pub state: SingleLookupRequestState,
pub state: SingleLookupRequestState<FixedBlobSidecarList<E>>,
}
impl BlobRequestState {
pub fn new(block_root: Hash256, peer_source: &[PeerId], is_deneb: bool) -> Self {
let default_ids = MissingBlobs::new_without_block(block_root, is_deneb);
impl<E: EthSpec> BlobRequestState<E> {
pub fn new(block_root: Hash256, peer_source: &[PeerId]) -> Self {
Self {
block_root,
requested_ids: default_ids,
state: SingleLookupRequestState::new(peer_source),
}
}
}
/// The state of the block request component of a `SingleBlockLookup`.
pub struct BlockRequestState {
pub struct BlockRequestState<E: EthSpec> {
pub requested_block_root: Hash256,
pub state: SingleLookupRequestState,
pub state: SingleLookupRequestState<Arc<SignedBeaconBlock<E>>>,
}
impl BlockRequestState {
impl<E: EthSpec> BlockRequestState<E> {
pub fn new(block_root: Hash256, peers: &[PeerId]) -> Self {
Self {
requested_block_root: block_root,
@@ -338,36 +210,28 @@ impl BlockRequestState {
}
}
/// This is the status of cached components for a lookup if they are required. It provides information
/// about whether we should send a responses immediately for processing, whether we require more
/// responses, or whether all cached components have been received and the reconstructed block
/// should be sent for processing.
pub enum CachedChild<E: EthSpec> {
/// All child components have been received, this is the reconstructed block, including all.
/// It has been checked for consistency between blobs and block, but no consensus checks have
/// been performed and no kzg verification has been performed.
Ok(RpcBlock<E>),
/// All child components have not yet been received.
DownloadIncomplete,
/// Child components should not be cached, send this directly for processing.
NotRequired,
/// There was an error during consistency checks between block and blobs.
Err(AvailabilityCheckError),
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct DownloadResult<T: Clone> {
pub value: T,
pub block_root: Hash256,
pub seen_timestamp: Duration,
pub peer_id: PeerId,
}
#[derive(Debug, PartialEq, Eq)]
pub enum State {
pub enum State<T: Clone> {
AwaitingDownload,
Downloading { peer_id: PeerId },
Processing { peer_id: PeerId },
Processed { peer_id: PeerId },
Downloading,
AwaitingProcess(DownloadResult<T>),
Processing(DownloadResult<T>),
Processed(PeerId),
}
/// Object representing the state of a single block or blob lookup request.
#[derive(PartialEq, Eq, Debug)]
pub struct SingleLookupRequestState {
pub struct SingleLookupRequestState<T: Clone> {
/// State of this request.
state: State,
state: State<T>,
/// Peers that should have this block or blob.
available_peers: HashSet<PeerId>,
/// Peers from which we have requested this block.
@@ -376,15 +240,9 @@ pub struct SingleLookupRequestState {
failed_processing: u8,
/// How many times have we attempted to download this block or blob.
failed_downloading: u8,
/// Should be incremented everytime this request is retried. The purpose of this is to
/// differentiate retries of the same block/blob request within a lookup. We currently penalize
/// peers and retry requests prior to receiving the stream terminator. This means responses
/// from a prior request may arrive after a new request has been sent, this counter allows
/// us to differentiate these two responses.
req_counter: u32,
}
impl SingleLookupRequestState {
impl<T: Clone> SingleLookupRequestState<T> {
pub fn new(peers: &[PeerId]) -> Self {
let mut available_peers = HashSet::default();
for peer in peers.iter().copied() {
@@ -397,74 +255,159 @@ impl SingleLookupRequestState {
used_peers: HashSet::default(),
failed_processing: 0,
failed_downloading: 0,
req_counter: 0,
}
}
pub fn is_current_req_counter(&self, req_counter: u32) -> bool {
self.req_counter == req_counter
}
pub fn is_awaiting_download(&self) -> bool {
matches!(self.state, State::AwaitingDownload)
}
pub fn is_downloaded(&self) -> bool {
match self.state {
State::AwaitingDownload => false,
State::Downloading { .. } => false,
State::Processing { .. } => true,
State::Processed { .. } => true,
State::AwaitingDownload => true,
State::Downloading { .. }
| State::AwaitingProcess { .. }
| State::Processing { .. }
| State::Processed { .. } => false,
}
}
pub fn is_processed(&self) -> bool {
match self.state {
State::AwaitingDownload => false,
State::Downloading { .. } => false,
State::Processing { .. } => false,
State::AwaitingDownload
| State::Downloading { .. }
| State::AwaitingProcess { .. }
| State::Processing { .. } => false,
State::Processed { .. } => true,
}
}
pub fn on_download_start(&mut self, peer_id: PeerId) -> u32 {
self.state = State::Downloading { peer_id };
self.req_counter += 1;
self.req_counter
pub fn peek_downloaded_data(&self) -> Option<&T> {
match &self.state {
State::AwaitingDownload => None,
State::Downloading { .. } => None,
State::AwaitingProcess(result) => Some(&result.value),
State::Processing(result) => Some(&result.value),
State::Processed { .. } => None,
}
}
/// Switch to `AwaitingProcessing` if the request is in `AwaitingDownload` state, otherwise
/// ignore.
pub fn insert_verified_response(&mut self, result: DownloadResult<T>) -> bool {
if let State::AwaitingDownload = &self.state {
self.state = State::AwaitingProcess(result);
true
} else {
false
}
}
/// Switch to `Downloading` if the request is in `AwaitingDownload` state, otherwise returns None.
pub fn on_download_start(&mut self) -> Result<(), LookupRequestError> {
match &self.state {
State::AwaitingDownload => {
self.state = State::Downloading;
Ok(())
}
other => Err(LookupRequestError::BadState(format!(
"Bad state on_download_start expected AwaitingDownload got {other}"
))),
}
}
/// Registers a failure in downloading a block. This might be a peer disconnection or a wrong
/// block.
pub fn on_download_failure(&mut self) {
self.failed_downloading = self.failed_downloading.saturating_add(1);
self.state = State::AwaitingDownload;
}
pub fn on_download_success(&mut self) -> Result<(), String> {
pub fn on_download_failure(&mut self) -> Result<(), LookupRequestError> {
match &self.state {
State::Downloading { peer_id } => {
self.state = State::Processing { peer_id: *peer_id };
State::Downloading => {
self.failed_downloading = self.failed_downloading.saturating_add(1);
self.state = State::AwaitingDownload;
Ok(())
}
other => Err(format!(
"request bad state, expected downloading got {other}"
)),
other => Err(LookupRequestError::BadState(format!(
"Bad state on_download_failure expected Downloading got {other}"
))),
}
}
pub fn on_download_success(
&mut self,
result: DownloadResult<T>,
) -> Result<(), LookupRequestError> {
match &self.state {
State::Downloading => {
self.state = State::AwaitingProcess(result);
Ok(())
}
other => Err(LookupRequestError::BadState(format!(
"Bad state on_download_success expected Downloading got {other}"
))),
}
}
/// Switch to `Processing` if the request is in `AwaitingProcess` state, otherwise returns None.
pub fn maybe_start_processing(&mut self) -> Option<DownloadResult<T>> {
// For 2 lines replace state with placeholder to gain ownership of `result`
match &self.state {
State::AwaitingProcess(result) => {
let result = result.clone();
self.state = State::Processing(result.clone());
Some(result)
}
_ => None,
}
}
/// Revert into `AwaitingProcessing`, if the payload if not invalid and can be submitted for
/// processing latter.
pub fn revert_to_awaiting_processing(&mut self) -> Result<(), LookupRequestError> {
match &self.state {
State::Processing(result) => {
self.state = State::AwaitingProcess(result.clone());
Ok(())
}
other => Err(LookupRequestError::BadState(format!(
"Bad state on revert_to_awaiting_processing expected Processing got {other}"
))),
}
}
/// Registers a failure in processing a block.
pub fn on_processing_failure(&mut self) {
self.failed_processing = self.failed_processing.saturating_add(1);
self.state = State::AwaitingDownload;
pub fn on_processing_failure(&mut self) -> Result<PeerId, LookupRequestError> {
match &self.state {
State::Processing(result) => {
let peer_id = result.peer_id;
self.failed_processing = self.failed_processing.saturating_add(1);
self.state = State::AwaitingDownload;
Ok(peer_id)
}
other => Err(LookupRequestError::BadState(format!(
"Bad state on_processing_failure expected Processing got {other}"
))),
}
}
pub fn on_processing_success(&mut self) -> Result<(), String> {
pub fn on_processing_success(&mut self) -> Result<PeerId, LookupRequestError> {
match &self.state {
State::Processing { peer_id } => {
self.state = State::Processed { peer_id: *peer_id };
Ok(())
State::Processing(result) => {
let peer_id = result.peer_id;
self.state = State::Processed(peer_id);
Ok(peer_id)
}
other => Err(format!("not in processing state: {}", other).to_string()),
other => Err(LookupRequestError::BadState(format!(
"Bad state on_processing_success expected Processing got {other}"
))),
}
}
pub fn on_post_process_validation_failure(&mut self) -> Result<PeerId, LookupRequestError> {
match &self.state {
State::Processed(peer_id) => {
let peer_id = *peer_id;
self.failed_processing = self.failed_processing.saturating_add(1);
self.state = State::AwaitingDownload;
Ok(peer_id)
}
other => Err(LookupRequestError::BadState(format!(
"Bad state on_post_process_validation_failure expected Processed got {other}"
))),
}
}
@@ -483,31 +426,18 @@ impl SingleLookupRequestState {
}
/// If a peer disconnects, this request could be failed. If so, an error is returned
pub fn check_peer_disconnected(&mut self, dc_peer_id: &PeerId) -> Result<(), ()> {
self.available_peers.remove(dc_peer_id);
if let State::Downloading { peer_id } = &self.state {
if peer_id == dc_peer_id {
// Peer disconnected before providing a block
self.on_download_failure();
return Err(());
}
}
Ok(())
}
/// Returns the id peer we downloaded from if we have downloaded a verified block, otherwise
/// returns an error.
pub fn processing_peer(&self) -> Result<PeerId, String> {
match &self.state {
State::Processing { peer_id } | State::Processed { peer_id } => Ok(*peer_id),
other => Err(format!("not in processing state: {}", other).to_string()),
}
pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) {
self.available_peers.remove(disconnected_peer_id);
}
pub fn get_used_peers(&self) -> impl Iterator<Item = &PeerId> {
self.used_peers.iter()
}
pub fn get_available_peers(&self) -> impl Iterator<Item = &PeerId> {
self.available_peers.iter()
}
/// Selects a random peer from available peers if any, inserts it in used peers and returns it.
pub fn use_rand_available_peer(&mut self) -> Option<PeerId> {
let peer_id = self
@@ -520,65 +450,25 @@ impl SingleLookupRequestState {
}
}
impl<T: BeaconChainTypes> slog::Value for SingleBlockLookup<T> {
fn serialize(
&self,
_record: &slog::Record,
key: slog::Key,
serializer: &mut dyn slog::Serializer,
) -> slog::Result {
serializer.emit_str("request", key)?;
serializer.emit_arguments("lookup_type", &format_args!("{:?}", self.lookup_type))?;
serializer.emit_arguments("hash", &format_args!("{}", self.block_root()))?;
serializer.emit_arguments(
"blob_ids",
&format_args!("{:?}", self.blob_request_state.requested_ids.indices()),
)?;
serializer.emit_arguments(
"block_request_state.state",
&format_args!("{:?}", self.block_request_state.state),
)?;
serializer.emit_arguments(
"blob_request_state.state",
&format_args!("{:?}", self.blob_request_state.state),
)?;
slog::Result::Ok(())
}
}
impl slog::Value for SingleLookupRequestState {
fn serialize(
&self,
record: &slog::Record,
key: slog::Key,
serializer: &mut dyn slog::Serializer,
) -> slog::Result {
serializer.emit_str("request_state", key)?;
match &self.state {
State::AwaitingDownload => {
"awaiting_download".serialize(record, "state", serializer)?
}
State::Downloading { peer_id } => {
serializer.emit_arguments("downloading_peer", &format_args!("{}", peer_id))?
}
State::Processing { peer_id } => {
serializer.emit_arguments("processing_peer", &format_args!("{}", peer_id))?
}
State::Processed { .. } => "processed".serialize(record, "state", serializer)?,
}
serializer.emit_u8("failed_downloads", self.failed_downloading)?;
serializer.emit_u8("failed_processing", self.failed_processing)?;
slog::Result::Ok(())
}
}
impl std::fmt::Display for State {
impl<T: Clone> std::fmt::Display for State<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
State::AwaitingDownload => write!(f, "AwaitingDownload"),
State::Downloading { .. } => write!(f, "Downloading"),
State::AwaitingProcess { .. } => write!(f, "AwaitingProcessing"),
State::Processing { .. } => write!(f, "Processing"),
State::Processed { .. } => write!(f, "Processed"),
}
}
}
impl LookupRequestError {
pub(crate) fn as_metric(&self) -> &'static str {
match self {
LookupRequestError::TooManyAttempts { .. } => "TooManyAttempts",
LookupRequestError::NoPeers => "NoPeers",
LookupRequestError::SendFailed { .. } => "SendFailed",
LookupRequestError::BadState { .. } => "BadState",
}
}
}

File diff suppressed because it is too large Load Diff