Drop lookup type trait for a simple arg (#5620)

* Drop lookup type trait for a simple arg
This commit is contained in:
Lion - dapplion
2024-04-24 15:02:45 +09:00
committed by GitHub
parent 1eaaa4a8bd
commit c38b05d640
5 changed files with 73 additions and 230 deletions

View File

@@ -29,34 +29,12 @@ pub enum LookupType {
Parent, Parent,
} }
/// This trait helps differentiate `SingleBlockLookup`s from `ParentLookup`s .This is useful in impl LookupType {
/// ensuring requests and responses are handled separately and enables us to use different failure fn max_attempts(&self) -> u8 {
/// tolerances for each, while re-using the same basic request and retry logic. match self {
pub trait Lookup { LookupType::Current => SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
const MAX_ATTEMPTS: u8; LookupType::Parent => PARENT_FAIL_TOLERANCE,
fn lookup_type() -> LookupType; }
fn max_attempts() -> u8 {
Self::MAX_ATTEMPTS
}
}
/// A `Lookup` that is a part of a `ParentLookup`.
pub struct Parent;
impl Lookup for Parent {
const MAX_ATTEMPTS: u8 = PARENT_FAIL_TOLERANCE;
fn lookup_type() -> LookupType {
LookupType::Parent
}
}
/// A `Lookup` that part of a single block lookup.
pub struct Current;
impl Lookup for Current {
const MAX_ATTEMPTS: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS;
fn lookup_type() -> LookupType {
LookupType::Current
} }
} }
@@ -68,7 +46,7 @@ impl Lookup for Current {
/// The use of the `ResponseType` associated type gives us a degree of type /// The use of the `ResponseType` associated type gives us a degree of type
/// safety when handling a block/blob response ensuring we only mutate the correct corresponding /// safety when handling a block/blob response ensuring we only mutate the correct corresponding
/// state. /// state.
pub trait RequestState<L: Lookup, T: BeaconChainTypes> { pub trait RequestState<T: BeaconChainTypes> {
/// The type of the request . /// The type of the request .
type RequestType; type RequestType;
@@ -81,9 +59,12 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
/* Request building methods */ /* Request building methods */
/// Construct a new request. /// Construct a new request.
fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> { fn build_request(
&mut self,
lookup_type: LookupType,
) -> Result<(PeerId, Self::RequestType), LookupRequestError> {
// Verify and construct request. // Verify and construct request.
self.too_many_attempts()?; self.too_many_attempts(lookup_type)?;
let peer = self.get_peer()?; let peer = self.get_peer()?;
let request = self.new_request(); let request = self.new_request();
Ok((peer, request)) Ok((peer, request))
@@ -93,6 +74,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
fn build_request_and_send( fn build_request_and_send(
&mut self, &mut self,
id: Id, id: Id,
lookup_type: LookupType,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> { ) -> Result<(), LookupRequestError> {
// Check if request is necessary. // Check if request is necessary.
@@ -101,7 +83,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
} }
// Construct request. // Construct request.
let (peer_id, request) = self.build_request()?; let (peer_id, request) = self.build_request(lookup_type)?;
// Update request state. // Update request state.
let req_counter = self.get_state_mut().on_download_start(peer_id); let req_counter = self.get_state_mut().on_download_start(peer_id);
@@ -110,17 +92,16 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
let id = SingleLookupReqId { let id = SingleLookupReqId {
id, id,
req_counter, req_counter,
lookup_type: L::lookup_type(), lookup_type,
}; };
Self::make_request(id, peer_id, request, cx) Self::make_request(id, peer_id, request, cx)
} }
/// Verify the current request has not exceeded the maximum number of attempts. /// Verify the current request has not exceeded the maximum number of attempts.
fn too_many_attempts(&self) -> Result<(), LookupRequestError> { fn too_many_attempts(&self, lookup_type: LookupType) -> Result<(), LookupRequestError> {
let max_attempts = L::max_attempts();
let request_state = self.get_state(); let request_state = self.get_state();
if request_state.failed_attempts() >= max_attempts { if request_state.failed_attempts() >= lookup_type.max_attempts() {
let cannot_process = request_state.more_failed_processing_attempts(); let cannot_process = request_state.more_failed_processing_attempts();
Err(LookupRequestError::TooManyAttempts { cannot_process }) Err(LookupRequestError::TooManyAttempts { cannot_process })
} else { } else {
@@ -187,7 +168,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
fn response_type() -> ResponseType; fn response_type() -> ResponseType;
/// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait. /// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait.
fn request_state_mut(request: &mut SingleBlockLookup<L, T>) -> &mut Self; fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self;
/// A getter for a reference to the `SingleLookupRequestState` associated with this trait. /// A getter for a reference to the `SingleLookupRequestState` associated with this trait.
fn get_state(&self) -> &SingleLookupRequestState; fn get_state(&self) -> &SingleLookupRequestState;
@@ -196,7 +177,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState; fn get_state_mut(&mut self) -> &mut SingleLookupRequestState;
} }
impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L> { impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState {
type RequestType = BlocksByRootSingleRequest; type RequestType = BlocksByRootSingleRequest;
type VerifiedResponseType = Arc<SignedBeaconBlock<T::EthSpec>>; type VerifiedResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;
type ReconstructedResponseType = RpcBlock<T::EthSpec>; type ReconstructedResponseType = RpcBlock<T::EthSpec>;
@@ -253,7 +234,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
fn response_type() -> ResponseType { fn response_type() -> ResponseType {
ResponseType::Block ResponseType::Block
} }
fn request_state_mut(request: &mut SingleBlockLookup<L, T>) -> &mut Self { fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.block_request_state &mut request.block_request_state
} }
fn get_state(&self) -> &SingleLookupRequestState { fn get_state(&self) -> &SingleLookupRequestState {
@@ -264,7 +245,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
} }
} }
impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L, T::EthSpec> { impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
type RequestType = BlobsByRootSingleBlockRequest; type RequestType = BlobsByRootSingleBlockRequest;
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>; type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;
type ReconstructedResponseType = FixedBlobSidecarList<T::EthSpec>; type ReconstructedResponseType = FixedBlobSidecarList<T::EthSpec>;
@@ -328,7 +309,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L,
fn response_type() -> ResponseType { fn response_type() -> ResponseType {
ResponseType::Blob ResponseType::Blob
} }
fn request_state_mut(request: &mut SingleBlockLookup<L, T>) -> &mut Self { fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.blob_request_state &mut request.blob_request_state
} }
fn get_state(&self) -> &SingleLookupRequestState { fn get_state(&self) -> &SingleLookupRequestState {

View File

@@ -16,9 +16,6 @@ use beacon_chain::data_availability_checker::{
}; };
use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::validator_monitor::timestamp_now;
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
pub use common::Current;
pub use common::Lookup;
pub use common::Parent;
pub use common::RequestState; pub use common::RequestState;
use fnv::FnvHashMap; use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId}; use lighthouse_network::{PeerAction, PeerId};
@@ -55,12 +52,12 @@ pub struct BlockLookups<T: BeaconChainTypes> {
/// Parent chain lookups being downloaded. /// Parent chain lookups being downloaded.
parent_lookups: SmallVec<[ParentLookup<T>; 3]>, parent_lookups: SmallVec<[ParentLookup<T>; 3]>,
processing_parent_lookups: HashMap<Hash256, (Vec<Hash256>, SingleBlockLookup<Parent, T>)>, processing_parent_lookups: HashMap<Hash256, (Vec<Hash256>, SingleBlockLookup<T>)>,
/// A cache of failed chain lookups to prevent duplicate searches. /// A cache of failed chain lookups to prevent duplicate searches.
failed_chains: LRUTimeCache<Hash256>, failed_chains: LRUTimeCache<Hash256>,
single_block_lookups: FnvHashMap<Id, SingleBlockLookup<Current, T>>, single_block_lookups: FnvHashMap<Id, SingleBlockLookup<T>>,
pub(crate) da_checker: Arc<DataAvailabilityChecker<T>>, pub(crate) da_checker: Arc<DataAvailabilityChecker<T>>,
@@ -131,7 +128,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Attempts to trigger the request matching the given `block_root`. /// Attempts to trigger the request matching the given `block_root`.
pub fn trigger_single_lookup( pub fn trigger_single_lookup(
&mut self, &mut self,
mut single_block_lookup: SingleBlockLookup<Current, T>, mut single_block_lookup: SingleBlockLookup<T>,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) { ) {
let block_root = single_block_lookup.block_root(); let block_root = single_block_lookup.block_root();
@@ -147,7 +144,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
} }
/// Adds a lookup to the `single_block_lookups` map. /// Adds a lookup to the `single_block_lookups` map.
pub fn add_single_lookup(&mut self, single_block_lookup: SingleBlockLookup<Current, T>) { pub fn add_single_lookup(&mut self, single_block_lookup: SingleBlockLookup<T>) {
self.single_block_lookups self.single_block_lookups
.insert(single_block_lookup.id, single_block_lookup); .insert(single_block_lookup.id, single_block_lookup);
@@ -212,6 +209,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peers, peers,
self.da_checker.clone(), self.da_checker.clone(),
cx.next_id(), cx.next_id(),
LookupType::Current,
); );
debug!( debug!(
@@ -284,10 +282,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Get a single block lookup by its ID. This method additionally ensures the `req_counter` /// Get a single block lookup by its ID. This method additionally ensures the `req_counter`
/// matches the current `req_counter` for the lookup. This ensures any stale responses from requests /// matches the current `req_counter` for the lookup. This ensures any stale responses from requests
/// that have been retried are ignored. /// that have been retried are ignored.
fn get_single_lookup<R: RequestState<Current, T>>( fn get_single_lookup<R: RequestState<T>>(
&mut self, &mut self,
id: SingleLookupReqId, id: SingleLookupReqId,
) -> Option<SingleBlockLookup<Current, T>> { ) -> Option<SingleBlockLookup<T>> {
let mut lookup = self.single_block_lookups.remove(&id.id)?; let mut lookup = self.single_block_lookups.remove(&id.id)?;
let request_state = R::request_state_mut(&mut lookup); let request_state = R::request_state_mut(&mut lookup);
@@ -314,7 +312,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
} }
/// Process a block or blob response received from a single lookup request. /// Process a block or blob response received from a single lookup request.
pub fn single_lookup_response<R: RequestState<Current, T>>( pub fn single_lookup_response<R: RequestState<T>>(
&mut self, &mut self,
lookup_id: SingleLookupReqId, lookup_id: SingleLookupReqId,
peer_id: PeerId, peer_id: PeerId,
@@ -345,7 +343,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"response_type" => ?response_type, "response_type" => ?response_type,
); );
match self.handle_verified_response::<Current, R>( match self.handle_verified_response::<R>(
seen_timestamp, seen_timestamp,
cx, cx,
BlockProcessType::SingleBlock { id: lookup.id }, BlockProcessType::SingleBlock { id: lookup.id },
@@ -372,13 +370,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Consolidates error handling for `single_lookup_response`. An `Err` here should always mean /// Consolidates error handling for `single_lookup_response`. An `Err` here should always mean
/// the lookup is dropped. /// the lookup is dropped.
fn handle_verified_response<L: Lookup, R: RequestState<L, T>>( fn handle_verified_response<R: RequestState<T>>(
&self, &self,
seen_timestamp: Duration, seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
process_type: BlockProcessType, process_type: BlockProcessType,
verified_response: R::VerifiedResponseType, verified_response: R::VerifiedResponseType,
lookup: &mut SingleBlockLookup<L, T>, lookup: &mut SingleBlockLookup<T>,
) -> Result<(), LookupRequestError> { ) -> Result<(), LookupRequestError> {
let id = lookup.id; let id = lookup.id;
let block_root = lookup.block_root(); let block_root = lookup.block_root();
@@ -389,7 +387,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// If we have an outstanding parent request for this block, delay sending the response until // If we have an outstanding parent request for this block, delay sending the response until
// all parent blocks have been processed, otherwise we will fail validation with an // all parent blocks have been processed, otherwise we will fail validation with an
// `UnknownParent`. // `UnknownParent`.
let delay_send = match L::lookup_type() { let delay_send = match lookup.lookup_type {
LookupType::Parent => false, LookupType::Parent => false,
LookupType::Current => self.has_pending_parent_request(lookup.block_root()), LookupType::Current => self.has_pending_parent_request(lookup.block_root()),
}; };
@@ -453,7 +451,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Get a parent block lookup by its ID. This method additionally ensures the `req_counter` /// Get a parent block lookup by its ID. This method additionally ensures the `req_counter`
/// matches the current `req_counter` for the lookup. This any stale responses from requests /// matches the current `req_counter` for the lookup. This any stale responses from requests
/// that have been retried are ignored. /// that have been retried are ignored.
fn get_parent_lookup<R: RequestState<Parent, T>>( fn get_parent_lookup<R: RequestState<T>>(
&mut self, &mut self,
id: SingleLookupReqId, id: SingleLookupReqId,
) -> Option<ParentLookup<T>> { ) -> Option<ParentLookup<T>> {
@@ -479,7 +477,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
} }
/// Process a response received from a parent lookup request. /// Process a response received from a parent lookup request.
pub fn parent_lookup_response<R: RequestState<Parent, T>>( pub fn parent_lookup_response<R: RequestState<T>>(
&mut self, &mut self,
id: SingleLookupReqId, id: SingleLookupReqId,
peer_id: PeerId, peer_id: PeerId,
@@ -523,7 +521,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Consolidates error handling for `parent_lookup_response`. An `Err` here should always mean /// Consolidates error handling for `parent_lookup_response`. An `Err` here should always mean
/// the lookup is dropped. /// the lookup is dropped.
fn parent_lookup_response_inner<R: RequestState<Parent, T>>( fn parent_lookup_response_inner<R: RequestState<T>>(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
response: R::VerifiedResponseType, response: R::VerifiedResponseType,
@@ -554,7 +552,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
} }
} }
self.handle_verified_response::<Parent, R>( self.handle_verified_response::<R>(
seen_timestamp, seen_timestamp,
cx, cx,
BlockProcessType::ParentLookup { BlockProcessType::ParentLookup {
@@ -633,7 +631,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
} }
/// An RPC error has occurred during a parent lookup. This function handles this case. /// An RPC error has occurred during a parent lookup. This function handles this case.
pub fn parent_lookup_failed<R: RequestState<Parent, T>>( pub fn parent_lookup_failed<R: RequestState<T>>(
&mut self, &mut self,
id: SingleLookupReqId, id: SingleLookupReqId,
peer_id: &PeerId, peer_id: &PeerId,
@@ -669,7 +667,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
} }
/// An RPC error has occurred during a single lookup. This function handles this case.\ /// An RPC error has occurred during a single lookup. This function handles this case.\
pub fn single_block_lookup_failed<R: RequestState<Current, T>>( pub fn single_block_lookup_failed<R: RequestState<T>>(
&mut self, &mut self,
id: SingleLookupReqId, id: SingleLookupReqId,
peer_id: &PeerId, peer_id: &PeerId,
@@ -717,7 +715,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/* Processing responses */ /* Processing responses */
pub fn single_block_component_processed<R: RequestState<Current, T>>( pub fn single_block_component_processed<R: RequestState<T>>(
&mut self, &mut self,
target_id: Id, target_id: Id,
result: BlockProcessingResult<T::EthSpec>, result: BlockProcessingResult<T::EthSpec>,

View File

@@ -1,6 +1,6 @@
use super::common::LookupType;
use super::single_block_lookup::{LookupRequestError, SingleBlockLookup}; use super::single_block_lookup::{LookupRequestError, SingleBlockLookup};
use super::{DownloadedBlock, PeerId}; use super::{DownloadedBlock, PeerId};
use crate::sync::block_lookups::common::Parent;
use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext}; use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext};
use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::block_verification_types::RpcBlock;
@@ -24,7 +24,7 @@ pub(crate) struct ParentLookup<T: BeaconChainTypes> {
/// The blocks that have currently been downloaded. /// The blocks that have currently been downloaded.
downloaded_blocks: Vec<DownloadedBlock<T::EthSpec>>, downloaded_blocks: Vec<DownloadedBlock<T::EthSpec>>,
/// Request of the last parent. /// Request of the last parent.
pub current_parent_request: SingleBlockLookup<Parent, T>, pub current_parent_request: SingleBlockLookup<T>,
} }
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
@@ -55,6 +55,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
&[peer_id], &[peer_id],
da_checker, da_checker,
cx.next_id(), cx.next_id(),
LookupType::Parent,
); );
Self { Self {
@@ -132,7 +133,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
Hash256, Hash256,
VecDeque<RpcBlock<T::EthSpec>>, VecDeque<RpcBlock<T::EthSpec>>,
Vec<Hash256>, Vec<Hash256>,
SingleBlockLookup<Parent, T>, SingleBlockLookup<T>,
) { ) {
let ParentLookup { let ParentLookup {
chain_hash, chain_hash,

View File

@@ -1,5 +1,6 @@
use super::common::LookupType;
use super::PeerId; use super::PeerId;
use crate::sync::block_lookups::common::{Lookup, RequestState}; use crate::sync::block_lookups::common::RequestState;
use crate::sync::block_lookups::Id; use crate::sync::block_lookups::Id;
use crate::sync::network_context::SyncNetworkContext; use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::block_verification_types::RpcBlock;
@@ -14,7 +15,6 @@ use rand::seq::IteratorRandom;
use slog::{debug, Logger}; use slog::{debug, Logger};
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt::Debug; use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use store::Hash256; use store::Hash256;
use strum::IntoStaticStr; use strum::IntoStaticStr;
@@ -33,27 +33,30 @@ pub enum LookupRequestError {
BadState(String), BadState(String),
} }
pub struct SingleBlockLookup<L: Lookup, T: BeaconChainTypes> { pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id, pub id: Id,
pub block_request_state: BlockRequestState<L>, pub lookup_type: LookupType,
pub blob_request_state: BlobRequestState<L, T::EthSpec>, pub block_request_state: BlockRequestState,
pub blob_request_state: BlobRequestState<T::EthSpec>,
pub da_checker: Arc<DataAvailabilityChecker<T>>, pub da_checker: Arc<DataAvailabilityChecker<T>>,
/// Only necessary for requests triggered by an `UnknownBlockParent` or `UnknownBlockParent` /// Only necessary for requests triggered by an `UnknownBlockParent` or `UnknownBlockParent`
/// because any blocks or blobs without parents won't hit the data availability cache. /// because any blocks or blobs without parents won't hit the data availability cache.
pub child_components: Option<ChildComponents<T::EthSpec>>, pub child_components: Option<ChildComponents<T::EthSpec>>,
} }
impl<L: Lookup, T: BeaconChainTypes> SingleBlockLookup<L, T> { impl<T: BeaconChainTypes> SingleBlockLookup<T> {
pub fn new( pub fn new(
requested_block_root: Hash256, requested_block_root: Hash256,
child_components: Option<ChildComponents<T::EthSpec>>, child_components: Option<ChildComponents<T::EthSpec>>,
peers: &[PeerId], peers: &[PeerId],
da_checker: Arc<DataAvailabilityChecker<T>>, da_checker: Arc<DataAvailabilityChecker<T>>,
id: Id, id: Id,
lookup_type: LookupType,
) -> Self { ) -> Self {
let is_deneb = da_checker.is_deneb(); let is_deneb = da_checker.is_deneb();
Self { Self {
id, id,
lookup_type,
block_request_state: BlockRequestState::new(requested_block_root, peers), block_request_state: BlockRequestState::new(requested_block_root, peers),
blob_request_state: BlobRequestState::new(requested_block_root, peers, is_deneb), blob_request_state: BlobRequestState::new(requested_block_root, peers, is_deneb),
da_checker, da_checker,
@@ -103,11 +106,11 @@ impl<L: Lookup, T: BeaconChainTypes> SingleBlockLookup<L, T> {
if !block_already_downloaded { if !block_already_downloaded {
self.block_request_state self.block_request_state
.build_request_and_send(self.id, cx)?; .build_request_and_send(self.id, self.lookup_type, cx)?;
} }
if !blobs_already_downloaded { if !blobs_already_downloaded {
self.blob_request_state self.blob_request_state
.build_request_and_send(self.id, cx)?; .build_request_and_send(self.id, self.lookup_type, cx)?;
} }
Ok(()) Ok(())
} }
@@ -144,7 +147,7 @@ impl<L: Lookup, T: BeaconChainTypes> SingleBlockLookup<L, T> {
/// Accepts a verified response, and adds it to the child components if required. This method /// Accepts a verified response, and adds it to the child components if required. This method
/// returns a `CachedChild` which provides a completed block + blob response if all components have been /// returns a `CachedChild` which provides a completed block + blob response if all components have been
/// received, or information about whether the child is required and if it has been downloaded. /// received, or information about whether the child is required and if it has been downloaded.
pub fn add_response<R: RequestState<L, T>>( pub fn add_response<R: RequestState<T>>(
&mut self, &mut self,
verified_response: R::VerifiedResponseType, verified_response: R::VerifiedResponseType,
) -> CachedChild<T::EthSpec> { ) -> CachedChild<T::EthSpec> {
@@ -301,7 +304,7 @@ impl<L: Lookup, T: BeaconChainTypes> SingleBlockLookup<L, T> {
} }
/// The state of the blob request component of a `SingleBlockLookup`. /// The state of the blob request component of a `SingleBlockLookup`.
pub struct BlobRequestState<L: Lookup, E: EthSpec> { pub struct BlobRequestState<E: EthSpec> {
/// The latest picture of which blobs still need to be requested. This includes information /// The latest picture of which blobs still need to be requested. This includes information
/// from both block/blobs downloaded in the network layer and any blocks/blobs that exist in /// from both block/blobs downloaded in the network layer and any blocks/blobs that exist in
/// the data availability checker. /// the data availability checker.
@@ -310,10 +313,9 @@ pub struct BlobRequestState<L: Lookup, E: EthSpec> {
/// Where we store blobs until we receive the stream terminator. /// Where we store blobs until we receive the stream terminator.
pub blob_download_queue: FixedBlobSidecarList<E>, pub blob_download_queue: FixedBlobSidecarList<E>,
pub state: SingleLookupRequestState, pub state: SingleLookupRequestState,
_phantom: PhantomData<L>,
} }
impl<L: Lookup, E: EthSpec> BlobRequestState<L, E> { impl<E: EthSpec> BlobRequestState<E> {
pub fn new(block_root: Hash256, peer_source: &[PeerId], is_deneb: bool) -> Self { pub fn new(block_root: Hash256, peer_source: &[PeerId], is_deneb: bool) -> Self {
let default_ids = MissingBlobs::new_without_block(block_root, is_deneb); let default_ids = MissingBlobs::new_without_block(block_root, is_deneb);
Self { Self {
@@ -321,24 +323,21 @@ impl<L: Lookup, E: EthSpec> BlobRequestState<L, E> {
requested_ids: default_ids, requested_ids: default_ids,
blob_download_queue: <_>::default(), blob_download_queue: <_>::default(),
state: SingleLookupRequestState::new(peer_source), state: SingleLookupRequestState::new(peer_source),
_phantom: PhantomData,
} }
} }
} }
/// The state of the block request component of a `SingleBlockLookup`. /// The state of the block request component of a `SingleBlockLookup`.
pub struct BlockRequestState<L: Lookup> { pub struct BlockRequestState {
pub requested_block_root: Hash256, pub requested_block_root: Hash256,
pub state: SingleLookupRequestState, pub state: SingleLookupRequestState,
_phantom: PhantomData<L>,
} }
impl<L: Lookup> BlockRequestState<L> { impl BlockRequestState {
pub fn new(block_root: Hash256, peers: &[PeerId]) -> Self { pub fn new(block_root: Hash256, peers: &[PeerId]) -> Self {
Self { Self {
requested_block_root: block_root, requested_block_root: block_root,
state: SingleLookupRequestState::new(peers), state: SingleLookupRequestState::new(peers),
_phantom: PhantomData,
} }
} }
} }
@@ -525,7 +524,7 @@ impl SingleLookupRequestState {
} }
} }
impl<L: Lookup, T: BeaconChainTypes> slog::Value for SingleBlockLookup<L, T> { impl<T: BeaconChainTypes> slog::Value for SingleBlockLookup<T> {
fn serialize( fn serialize(
&self, &self,
_record: &slog::Record, _record: &slog::Record,
@@ -533,7 +532,7 @@ impl<L: Lookup, T: BeaconChainTypes> slog::Value for SingleBlockLookup<L, T> {
serializer: &mut dyn slog::Serializer, serializer: &mut dyn slog::Serializer,
) -> slog::Result { ) -> slog::Result {
serializer.emit_str("request", key)?; serializer.emit_str("request", key)?;
serializer.emit_arguments("lookup_type", &format_args!("{:?}", L::lookup_type()))?; serializer.emit_arguments("lookup_type", &format_args!("{:?}", self.lookup_type))?;
serializer.emit_arguments("hash", &format_args!("{}", self.block_root()))?; serializer.emit_arguments("hash", &format_args!("{}", self.block_root()))?;
serializer.emit_arguments( serializer.emit_arguments(
"blob_ids", "blob_ids",
@@ -587,138 +586,3 @@ impl std::fmt::Display for State {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::sync::block_lookups::common::LookupType;
use beacon_chain::builder::Witness;
use beacon_chain::eth1_chain::CachingEth1Backend;
use sloggers::null::NullLoggerBuilder;
use sloggers::Build;
use slot_clock::{SlotClock, TestingSlotClock};
use std::time::Duration;
use store::{HotColdDB, MemoryStore, StoreConfig};
use types::{
test_utils::{SeedableRng, TestRandom, XorShiftRng},
ChainSpec, MinimalEthSpec as E, SignedBeaconBlock, Slot,
};
fn rand_block() -> SignedBeaconBlock<E> {
let mut rng = XorShiftRng::from_seed([42; 16]);
SignedBeaconBlock::from_block(
types::BeaconBlock::Base(types::BeaconBlockBase {
..<_>::random_for_test(&mut rng)
}),
types::Signature::random_for_test(&mut rng),
)
}
type T = Witness<TestingSlotClock, CachingEth1Backend<E>, E, MemoryStore<E>, MemoryStore<E>>;
struct TestLookup1;
impl Lookup for TestLookup1 {
const MAX_ATTEMPTS: u8 = 3;
fn lookup_type() -> LookupType {
panic!()
}
}
struct TestLookup2;
impl Lookup for TestLookup2 {
const MAX_ATTEMPTS: u8 = 4;
fn lookup_type() -> LookupType {
panic!()
}
}
#[test]
fn test_happy_path() {
let peer_id = PeerId::random();
let block = rand_block();
let spec = E::default_spec();
let slot_clock = TestingSlotClock::new(
Slot::new(0),
Duration::from_secs(0),
Duration::from_secs(spec.seconds_per_slot),
);
let log = NullLoggerBuilder.build().expect("logger should build");
let store =
HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log.clone())
.expect("store");
let da_checker = Arc::new(
DataAvailabilityChecker::new(slot_clock, None, store.into(), &log, spec.clone())
.expect("data availability checker"),
);
let mut sl = SingleBlockLookup::<TestLookup1, T>::new(
block.canonical_root(),
None,
&[peer_id],
da_checker,
1,
);
<BlockRequestState<TestLookup1> as RequestState<TestLookup1, T>>::build_request(
&mut sl.block_request_state,
)
.unwrap();
sl.block_request_state.state.state = State::Downloading { peer_id };
}
#[test]
fn test_block_lookup_failures() {
let peer_id = PeerId::random();
let block = rand_block();
let spec = E::default_spec();
let slot_clock = TestingSlotClock::new(
Slot::new(0),
Duration::from_secs(0),
Duration::from_secs(spec.seconds_per_slot),
);
let log = NullLoggerBuilder.build().expect("logger should build");
let store =
HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log.clone())
.expect("store");
let da_checker = Arc::new(
DataAvailabilityChecker::new(slot_clock, None, store.into(), &log, spec.clone())
.expect("data availability checker"),
);
let mut sl = SingleBlockLookup::<TestLookup2, T>::new(
block.canonical_root(),
None,
&[peer_id],
da_checker,
1,
);
for _ in 1..TestLookup2::MAX_ATTEMPTS {
<BlockRequestState<TestLookup2> as RequestState<TestLookup2, T>>::build_request(
&mut sl.block_request_state,
)
.unwrap();
sl.block_request_state.state.on_download_failure();
}
// Now we receive the block and send it for processing
<BlockRequestState<TestLookup2> as RequestState<TestLookup2, T>>::build_request(
&mut sl.block_request_state,
)
.unwrap();
sl.block_request_state.state.state = State::Downloading { peer_id };
// One processing failure maxes the available attempts
sl.block_request_state.state.on_processing_failure();
assert_eq!(
<BlockRequestState<TestLookup2> as RequestState<TestLookup2, T>>::build_request(
&mut sl.block_request_state,
)
.unwrap_err(),
LookupRequestError::TooManyAttempts {
cannot_process: false
}
)
}
}

View File

@@ -42,7 +42,6 @@ use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor};
use crate::service::NetworkMessage; use crate::service::NetworkMessage;
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use crate::sync::block_lookups::common::{Current, Parent};
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState}; use crate::sync::block_lookups::{BlobRequestState, BlockRequestState};
use crate::sync::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use crate::sync::block_sidecar_coupling::BlocksAndBlobsRequestInfo;
use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::AsBlock;
@@ -621,14 +620,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} => match process_type { } => match process_type {
BlockProcessType::SingleBlock { id } => self BlockProcessType::SingleBlock { id } => self
.block_lookups .block_lookups
.single_block_component_processed::<BlockRequestState<Current>>( .single_block_component_processed::<BlockRequestState>(
id, id,
result, result,
&mut self.network, &mut self.network,
), ),
BlockProcessType::SingleBlob { id } => self BlockProcessType::SingleBlob { id } => self
.block_lookups .block_lookups
.single_block_component_processed::<BlobRequestState<Current, T::EthSpec>>( .single_block_component_processed::<BlobRequestState<T::EthSpec>>(
id, id,
result, result,
&mut self.network, &mut self.network,
@@ -834,7 +833,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
Ok((block, seen_timestamp)) => match id.lookup_type { Ok((block, seen_timestamp)) => match id.lookup_type {
LookupType::Current => self LookupType::Current => self
.block_lookups .block_lookups
.single_lookup_response::<BlockRequestState<Current>>( .single_lookup_response::<BlockRequestState>(
id, id,
peer_id, peer_id,
block, block,
@@ -843,7 +842,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
), ),
LookupType::Parent => self LookupType::Parent => self
.block_lookups .block_lookups
.parent_lookup_response::<BlockRequestState<Parent>>( .parent_lookup_response::<BlockRequestState>(
id, id,
peer_id, peer_id,
block, block,
@@ -854,7 +853,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
Err(error) => match id.lookup_type { Err(error) => match id.lookup_type {
LookupType::Current => self LookupType::Current => self
.block_lookups .block_lookups
.single_block_lookup_failed::<BlockRequestState<Current>>( .single_block_lookup_failed::<BlockRequestState>(
id, id,
&peer_id, &peer_id,
&mut self.network, &mut self.network,
@@ -862,7 +861,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
), ),
LookupType::Parent => self LookupType::Parent => self
.block_lookups .block_lookups
.parent_lookup_failed::<BlockRequestState<Parent>>( .parent_lookup_failed::<BlockRequestState>(
id, id,
&peer_id, &peer_id,
&mut self.network, &mut self.network,
@@ -909,7 +908,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
Ok((blobs, seen_timestamp)) => match id.lookup_type { Ok((blobs, seen_timestamp)) => match id.lookup_type {
LookupType::Current => self LookupType::Current => self
.block_lookups .block_lookups
.single_lookup_response::<BlobRequestState<Current, T::EthSpec>>( .single_lookup_response::<BlobRequestState<T::EthSpec>>(
id, id,
peer_id, peer_id,
blobs, blobs,
@@ -918,7 +917,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
), ),
LookupType::Parent => self LookupType::Parent => self
.block_lookups .block_lookups
.parent_lookup_response::<BlobRequestState<Parent, T::EthSpec>>( .parent_lookup_response::<BlobRequestState<T::EthSpec>>(
id, id,
peer_id, peer_id,
blobs, blobs,
@@ -930,7 +929,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
Err(error) => match id.lookup_type { Err(error) => match id.lookup_type {
LookupType::Current => self LookupType::Current => self
.block_lookups .block_lookups
.single_block_lookup_failed::<BlobRequestState<Current, T::EthSpec>>( .single_block_lookup_failed::<BlobRequestState<T::EthSpec>>(
id, id,
&peer_id, &peer_id,
&mut self.network, &mut self.network,
@@ -938,7 +937,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
), ),
LookupType::Parent => self LookupType::Parent => self
.block_lookups .block_lookups
.parent_lookup_failed::<BlobRequestState<Parent, T::EthSpec>>( .parent_lookup_failed::<BlobRequestState<T::EthSpec>>(
id, id,
&peer_id, &peer_id,
&mut self.network, &mut self.network,