start fixing some compile errors

This commit is contained in:
realbigsean
2023-04-17 16:58:18 -04:00
parent 8618c301b5
commit 195d802931
9 changed files with 174 additions and 163 deletions

View File

@@ -53,13 +53,8 @@ pub(crate) struct BlockLookups<T: BeaconChainTypes> {
/// Parent chain lookups being downloaded.
parent_lookups: SmallVec<[ParentLookup<T>; 3]>,
processing_parent_lookups: HashMap<
Hash256,
(
Vec<Hash256>,
SingleBlockLookup<PARENT_FAIL_TOLERANCE, T::EthSpec>,
),
>,
processing_parent_lookups:
HashMap<Hash256, (Vec<Hash256>, SingleBlockLookup<PARENT_FAIL_TOLERANCE, T>)>,
/// A cache of failed chain lookups to prevent duplicate searches.
failed_chains: LRUTimeCache<Hash256>,
@@ -68,10 +63,11 @@ pub(crate) struct BlockLookups<T: BeaconChainTypes> {
/// received or not.
///
/// The flag allows us to determine if the peer returned data or sent us nothing.
single_block_lookups:
FnvHashMap<Id, SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T::EthSpec>>,
blob_ids_to_block_ids: HashMap<Id, Id>,
single_block_lookups: Vec<(
Option<Id>,
Option<Id>,
SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,
)>,
da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>,
@@ -96,7 +92,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
)),
single_block_lookups: Default::default(),
da_checker,
blob_ids_to_block_ids: Default::default(),
log,
}
}
@@ -119,8 +114,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// Do not re-request a block that is already being requested
if self
.single_block_lookups
.values_mut()
.any(|single_block_request| single_block_request.add_peer(&hash, &peer_id))
.iter_mut()
.any(|(block_id, blob_id, single_block_request)| {
single_block_request.add_peer(&hash, &peer_id)
})
{
return;
}
@@ -152,27 +149,27 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let mut single_block_request = SingleBlockLookup::new(hash, peer_id, da_checker);
cache_fn(&mut single_block_request);
let (peer_id, block_request) = single_block_request
.request_block()
.expect("none of the possible failure cases apply for a newly created block lookup");
let (peer_id, blob_request) = single_block_request
.request_blobs()
.expect("none of the possible failure cases apply for a newly created blob lookup");
let block_request_id =
if let Ok(Some((peer_id, block_request))) = single_block_request.request_block() {
cx.single_block_lookup_request(peer_id, block_request).ok()
} else {
None
};
if let (Ok(request_id), Ok(blob_request_id)) = (
cx.single_block_lookup_request(peer_id, block_request),
cx.single_blobs_lookup_request(peer_id, blob_request),
) {
self.single_block_lookups
.insert(request_id, single_block_request);
self.blob_ids_to_block_ids
.insert(blob_request_id, request_id);
let blob_request_id =
if let Ok(Some((peer_id, blob_request))) = single_block_request.request_blobs() {
cx.single_blobs_lookup_request(peer_id, blob_request).ok()
} else {
None
};
metrics::set_gauge(
&metrics::SYNC_SINGLE_BLOCK_LOOKUPS,
self.single_block_lookups.len() as i64,
);
}
self.single_block_lookups
.push((block_request_id, blob_request_id, single_block_request));
metrics::set_gauge(
&metrics::SYNC_SINGLE_BLOCK_LOOKUPS,
self.single_block_lookups.len() as i64,
);
}
pub fn search_current_unknown_parent(
@@ -182,7 +179,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
) {
self.search_block_with(|request| request.add_block(block), block_root, peer_id, cx);
self.search_block_with(
|request| {
let _ = request.add_block_wrapper(block_root, block);
},
block_root,
peer_id,
cx,
);
}
/// If a block is attempted to be processed but we do not know its parent, this function is
@@ -474,6 +478,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
match parent_lookup.verify_blob(blob, &mut self.failed_chains) {
Ok(Some(blobs)) => {
let block_root = blobs
.first()
.map(|blob| blob.block_root)
.unwrap_or(parent_lookup.chain_hash());
let processed_or_search = parent_lookup.add_blobs(blobs);
match processed_or_search {
@@ -695,7 +703,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e);
}
BlockError::ParentUnknown(block) => {
self.search_parent(root, block, peer_id, cx);
self.search_parent(root, block.parent_root(), peer_id, cx);
}
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
@@ -964,13 +972,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let response = parent_lookup
.request_parent_block(cx)
.and_then(|| parent_lookup.request_parent_blobs(cx));
self.handle_response(parent_lookup, response);
self.handle_response(parent_lookup, cx, response);
}
//TODO(sean) how should peer scoring work with failures in this method?
fn handle_response(
&mut self,
mut parent_lookup: ParentLookup<T>,
cx: &mut SyncNetworkContext<T>,
result: Result<(), parent_lookup::RequestError>,
) {
match result {

View File

@@ -35,7 +35,7 @@ pub(crate) struct ParentLookup<T: BeaconChainTypes> {
/// The blocks that have currently been downloaded.
downloaded_blocks: Vec<DownlodedBlocks<T::EthSpec>>,
/// Request of the last parent.
pub current_parent_request: SingleBlockLookup<PARENT_FAIL_TOLERANCE, T::EthSpec>,
pub current_parent_request: SingleBlockLookup<PARENT_FAIL_TOLERANCE, T>,
/// Id of the last parent request.
current_parent_request_id: Option<Id>,
current_parent_blob_request_id: Option<Id>,
@@ -101,30 +101,31 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
return Err(RequestError::ChainTooLong);
}
let (peer_id, request) = self.current_parent_request.make_request()?;
match cx.parent_lookup_block_request(peer_id, request) {
Ok(request_id) => {
self.current_parent_request_id = Some(request_id);
Ok(())
}
Err(reason) => {
self.current_parent_request_id = None;
Err(RequestError::SendFailed(reason))
if let Some((peer_id, request)) = self.current_parent_request.request_block()? {
match cx.parent_lookup_block_request(peer_id, request) {
Ok(request_id) => {
self.current_parent_request_id = Some(request_id);
Ok(())
}
Err(reason) => {
self.current_parent_request_id = None;
Err(RequestError::SendFailed(reason))
}
}
}
Ok(())
}
pub fn request_parent_blobs(
&mut self,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), RequestError> {
if let Some(blob_req) = self.current_parent_request.as_mut() {
// check to make sure this request hasn't failed
if self.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE {
return Err(RequestError::ChainTooLong);
}
// check to make sure this request hasn't failed
if self.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE {
return Err(RequestError::ChainTooLong);
}
let (peer_id, request) = blob_req.request_blobs()?;
if let Some((peer_id, request)) = self.current_parent_request.request_blobs()? {
match cx.parent_lookup_blobs_request(peer_id, request) {
Ok(request_id) => {
self.current_parent_blob_request_id = Some(request_id);
@@ -183,14 +184,13 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
Hash256,
Vec<MaybeAvailableBlock<T::EthSpec>>,
Vec<Hash256>,
SingleBlockLookup<PARENT_FAIL_TOLERANCE>,
SingleBlockLookup<PARENT_FAIL_TOLERANCE, T>,
Option<SingleBlobsRequest<PARENT_FAIL_TOLERANCE, T::EthSpec>>,
) {
let ParentLookup {
chain_hash,
downloaded_blocks,
current_parent_request,
current_parent_blob_request,
current_parent_request_id: _,
current_parent_blob_request_id: _,
} = self;
@@ -253,7 +253,9 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
.map(|(_, block)| block.parent_root())
{
if failed_chains.contains(&parent_root) {
self.current_parent_request.register_failure_downloading();
self.current_parent_request
.block_request_state
.register_failure_downloading();
self.current_parent_request_id = None;
return Err(VerifyError::PreviousFailure { parent_root });
}
@@ -267,11 +269,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
blob: Option<Arc<BlobSidecar<T::EthSpec>>>,
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
) -> Result<Option<Vec<Arc<BlobSidecar<T::EthSpec>>>>, VerifyError> {
let blobs = self
.current_parent_blob_request
.map(|mut req| req.verify_blob(blob))
.transpose()?
.flatten();
let blobs = self.current_parent_request.verify_blob(blob)?;
// check if the parent of this block isn't in the failed cache. If it is, this chain should
// be dropped and the peer downscored.
@@ -281,7 +279,8 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
.map(|blob| blob.block_parent_root)
{
if failed_chains.contains(&parent_root) {
self.current_parent_blob_request
self.current_parent_request
.blob_request_state
.register_failure_downloading();
self.current_parent_blob_request_id = None;
return Err(VerifyError::PreviousFailure { parent_root });

View File

@@ -23,7 +23,7 @@ use types::{BlobSidecar, SignedBeaconBlock};
pub struct SingleBlockLookup<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> {
pub requested_block_root: Hash256,
pub requested_ids: Vec<BlobIdentifier>,
pub downloaded_blobs: Vec<Arc<BlobSidecar<T::EthSpec>>>,
pub downloaded_blobs: Vec<Option<Arc<BlobSidecar<T::EthSpec>>>>,
pub downloaded_block: Option<Arc<BlobSidecar<T::EthSpec>>>,
pub block_request_state: SingleLookupRequestState<MAX_ATTEMPTS>,
pub blob_request_state: SingleLookupRequestState<MAX_ATTEMPTS>,
@@ -83,8 +83,6 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
peer_id: PeerId,
da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>,
) -> Self {
da_checker.get_missing_parts_for_hash(&requested_block_root);
Self {
requested_block_root,
requested_ids: vec![],
@@ -99,8 +97,8 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
pub fn add_blobs(
&mut self,
block_root: Hash256,
blobs: Vec<Arc<BlobSidecar<T>>>,
) -> RequestResult<T> {
blobs: Vec<Arc<BlobSidecar<T::EthSpec>>>,
) -> RequestResult<T::EthSpec> {
//TODO(sean) smart extend, we don't want dupes
self.downloaded_blobs.extend(blobs);
@@ -118,11 +116,35 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
pub fn add_block(
&mut self,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T>>,
) -> RequestResult<T> {
block: Arc<SignedBeaconBlock<T::EthSpec>>,
) -> RequestResult<T::EthSpec> {
//TODO(sean) check for existing block?
self.downloaded_block = Some(block);
match self
.da_checker
.zip_block(block_root, block, self.downloaded_blobs)
{
Ok(wrapper) => RequestResult::Process(wrapper),
Err(AvailabilityCheckError::MissingBlobs) => RequestResult::SearchBlock(block_root),
_ => todo!(),
}
}
pub fn add_block_wrapper(
&mut self,
block_root: Hash256,
block: BlockWrapper<T::EthSpec>,
) -> RequestResult<T::EthSpec> {
match block {
BlockWrapper::Block(block) => self.add_block(block_root, block),
BlockWrapper::BlockAndBlobs(block, blobs) => {
//TODO(sean) check for existing block?
self.downloaded_block = Some(block);
self.add_blobs(block_root, blobs)
}
}
match self.da_checker.zip_block(block_root, block, blobs) {
Ok(wrapper) => RequestResult::Process(wrapper),
Err(AvailabilityCheckError::MissingBlobs) => RequestResult::SearchBlock(block_root),
@@ -134,8 +156,8 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
/// Returns the block for processing if the response is what we expected.
pub fn verify_block(
&mut self,
block: Option<Arc<SignedBeaconBlock<T>>>,
) -> Result<Option<RootBlockTuple<T>>, VerifyError> {
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Result<Option<RootBlockTuple<T::EthSpec>>, VerifyError> {
match self.block_request_state.state {
State::AwaitingDownload => {
self.block_request_state.register_failure_downloading();
@@ -178,37 +200,10 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
}
}
pub fn request_block(&mut self) -> Result<(PeerId, BlocksByRootRequest), LookupRequestError> {
debug_assert!(matches!(
self.block_request_state.state,
State::AwaitingDownload
));
if self.failed_attempts() >= MAX_ATTEMPTS {
Err(LookupRequestError::TooManyAttempts {
cannot_process: self.block_request_state.failed_processing
>= self.block_request_state.failed_downloading,
})
} else if let Some(&peer_id) = self
.block_request_state
.available_peers
.iter()
.choose(&mut rand::thread_rng())
{
let request = BlocksByRootRequest {
block_roots: VariableList::from(vec![self.requested_block_root]),
};
self.block_request_state.state = State::Downloading { peer_id };
self.block_request_state.used_peers.insert(peer_id);
Ok((peer_id, request))
} else {
Err(LookupRequestError::NoPeers)
}
}
pub fn verify_blob<T: EthSpec>(
pub fn verify_blob(
&mut self,
blob: Option<Arc<BlobSidecar<T>>>,
) -> Result<Option<Vec<Arc<BlobSidecar<T>>>>, BlobVerifyError> {
blob: Option<Arc<BlobSidecar<T::EthSpec>>>,
) -> Result<Option<Vec<Arc<BlobSidecar<T::EthSpec>>>>, BlobVerifyError> {
match self.block_request_state.state {
State::AwaitingDownload => {
self.blob_request_state.register_failure_downloading();
@@ -246,7 +241,46 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
}
}
pub fn request_blobs(&mut self) -> Result<(PeerId, BlobsByRootRequest), LookupRequestError> {
pub fn request_block(
&mut self,
) -> Result<Option<(PeerId, BlocksByRootRequest)>, LookupRequestError> {
if self.da_checker.has_block(self.requested_block_root) || self.downloaded_block.is_some() {
return Ok(None);
}
debug_assert!(matches!(
self.block_request_state.state,
State::AwaitingDownload
));
if self.failed_attempts() >= MAX_ATTEMPTS {
Err(LookupRequestError::TooManyAttempts {
cannot_process: self.block_request_state.failed_processing
>= self.block_request_state.failed_downloading,
})
} else if let Some(&peer_id) = self
.block_request_state
.available_peers
.iter()
.choose(&mut rand::thread_rng())
{
let request = BlocksByRootRequest {
block_roots: VariableList::from(vec![self.requested_block_root]),
};
self.block_request_state.state = State::Downloading { peer_id };
self.block_request_state.used_peers.insert(peer_id);
Ok(Some((peer_id, request)))
} else {
Err(LookupRequestError::NoPeers)
}
}
pub fn request_blobs(
&mut self,
) -> Result<Option<(PeerId, BlobsByRootRequest)>, LookupRequestError> {
if self.da_checker.has_block(self.requested_block_root) || self.downloaded_block.is_some() {
return Ok(None);
}
debug_assert!(matches!(
self.block_request_state.state,
State::AwaitingDownload
@@ -267,7 +301,7 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
};
self.blob_request_state.state = State::Downloading { peer_id };
self.blob_request_state.used_peers.insert(peer_id);
Ok((peer_id, request))
Ok(Some((peer_id, request)))
} else {
Err(LookupRequestError::NoPeers)
}
@@ -338,7 +372,9 @@ impl<const MAX_ATTEMPTS: u8> SingleLookupRequestState<MAX_ATTEMPTS> {
}
}
impl<const MAX_ATTEMPTS: u8> slog::Value for SingleBlockLookup<MAX_ATTEMPTS> {
impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> slog::Value
for SingleBlockLookup<MAX_ATTEMPTS, T>
{
fn serialize(
&self,
record: &slog::Record,

View File

@@ -120,7 +120,7 @@ pub enum SyncMessage<T: EthSpec> {
},
/// A block with an unknown parent has been received.
UnknownBlock(PeerId, MaybeAvailableBlock<T>, Hash256),
UnknownBlock(PeerId, BlockWrapper<T>, Hash256),
/// A peer has sent an attestation that references a block that is unknown. This triggers the
/// manager to attempt to find the block matching the unknown hash.
@@ -246,15 +246,18 @@ pub fn spawn<T: BeaconChainTypes>(
log.clone(),
),
range_sync: RangeSync::new(beacon_chain.clone(), log.clone()),
backfill_sync: BackFillSync::new(beacon_chain, network_globals, log.clone()),
block_lookups: BlockLookups::new(log.clone()),
backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()),
block_lookups: BlockLookups::new(
beacon_chain.data_availability_checker.clone(),
log.clone(),
),
delayed_lookups: delayed_lookups_send,
log: log.clone(),
};
executor.spawn(
async move {
let slot_duration = slot_clock.slot_duration();
let slot_duration = beacon_chain.slot_clock.slot_duration();
// TODO(sean) think about what this should be
let delay = beacon_chain.slot_clock.unagg_attestation_production_delay();
@@ -346,7 +349,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
RequestId::ParentLookup { id } => {
self.block_lookups
.parent_lookup_failed(id, peer_id, &mut self.network, eror);
.parent_lookup_failed(id, peer_id, &mut self.network, error);
}
RequestId::BackFillBlocks { id } => {
if let Some(batch_id) = self