smol cleanup and a bugfix

This commit is contained in:
realbigsean
2023-08-01 18:31:01 -04:00
parent 6bcfaf4fde
commit 4215160bc1
2 changed files with 38 additions and 47 deletions

View File

@@ -318,7 +318,6 @@ impl TestRig {
} }
pub fn enqueue_single_lookup_rpc_blobs(&self) { pub fn enqueue_single_lookup_rpc_blobs(&self) {
if let Some(blobs) = self.next_blobs.clone() { if let Some(blobs) = self.next_blobs.clone() {
dbg!(blobs.len());
let blobs = FixedBlobSidecarList::from( let blobs = FixedBlobSidecarList::from(
blobs blobs
.into_iter() .into_iter()
@@ -1004,7 +1003,6 @@ async fn test_rpc_block_reprocessing() {
rig.enqueue_single_lookup_rpc_blobs(); rig.enqueue_single_lookup_rpc_blobs();
if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 { if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 {
dbg!("here");
rig.assert_event_journal(&[RPC_BLOBS, WORKER_FREED, NOTHING_TO_DO]) rig.assert_event_journal(&[RPC_BLOBS, WORKER_FREED, NOTHING_TO_DO])
.await; .await;
} }

View File

@@ -5,6 +5,7 @@ use super::BatchProcessResult;
use super::{manager::BlockProcessType, network_context::SyncNetworkContext}; use super::{manager::BlockProcessType, network_context::SyncNetworkContext};
use crate::metrics; use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId; use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::block_lookups::common::LookupType;
use crate::sync::block_lookups::parent_lookup::{ParentLookup, RequestError}; use crate::sync::block_lookups::parent_lookup::{ParentLookup, RequestError};
use crate::sync::block_lookups::single_block_lookup::{ use crate::sync::block_lookups::single_block_lookup::{
CachedChild, LookupRequestError, LookupVerifyError, CachedChild, LookupRequestError, LookupVerifyError,
@@ -16,7 +17,6 @@ use beacon_chain::validator_monitor::timestamp_now;
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
pub use common::Current; pub use common::Current;
pub use common::Lookup; pub use common::Lookup;
use common::LookupType;
pub use common::Parent; pub use common::Parent;
pub use common::RequestState; pub use common::RequestState;
use fnv::FnvHashMap; use fnv::FnvHashMap;
@@ -359,16 +359,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// necessary because we want to make sure all parents are processed before sending a child /// necessary because we want to make sure all parents are processed before sending a child
/// for processing, otherwise the block will fail validation and will be returned to the network /// for processing, otherwise the block will fail validation and will be returned to the network
/// layer with an `UnknownParent` error. /// layer with an `UnknownParent` error.
pub fn has_pending_parent_request(&self, target_id: Id) -> bool { pub fn has_pending_parent_request(&self, block_root: Hash256) -> bool {
self.single_block_lookups.iter().any(|(id, lookup)| {
if *id == target_id {
self.parent_lookups self.parent_lookups
.iter() .iter()
.any(|parent_lookup| parent_lookup.chain_hash() == lookup.block_root()) .any(|parent_lookup| parent_lookup.chain_hash() == block_root)
} else {
false
}
})
} }
/// Process a block or blob response received from a single lookup request. /// Process a block or blob response received from a single lookup request.
@@ -383,8 +377,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let id = lookup_id.id; let id = lookup_id.id;
let response_type = R::response_type(); let response_type = R::response_type();
let chain_hash_opt = self.has_pending_parent_request(id);
let Some(lookup) = self.get_single_lookup::<R>(lookup_id) else { let Some(lookup) = self.get_single_lookup::<R>(lookup_id) else {
if response.is_some() { if response.is_some() {
warn!( warn!(
@@ -398,14 +390,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let expected_block_root = lookup.block_root(); let expected_block_root = lookup.block_root();
match self.single_lookup_response_inner::<R>( match self.single_lookup_response_inner::<R>(peer_id, response, seen_timestamp, cx, lookup)
peer_id, {
response,
seen_timestamp,
cx,
chain_hash_opt,
lookup,
) {
Ok(lookup) => { Ok(lookup) => {
self.single_block_lookups.insert(id, lookup); self.single_block_lookups.insert(id, lookup);
} }
@@ -432,7 +418,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
response: Option<R::ResponseType>, response: Option<R::ResponseType>,
seen_timestamp: Duration, seen_timestamp: Duration,
cx: &SyncNetworkContext<T>, cx: &SyncNetworkContext<T>,
delay_send: bool,
mut lookup: SingleBlockLookup<Current, T>, mut lookup: SingleBlockLookup<Current, T>,
) -> Result<SingleBlockLookup<Current, T>, LookupRequestError> { ) -> Result<SingleBlockLookup<Current, T>, LookupRequestError> {
let response_type = R::response_type(); let response_type = R::response_type();
@@ -445,8 +430,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.handle_verified_response::<Current, R>( self.handle_verified_response::<Current, R>(
seen_timestamp, seen_timestamp,
cx, cx,
delay_send, BlockProcessType::SingleBlock { id: lookup.id },
None,
verified_response, verified_response,
&mut lookup, &mut lookup,
)?; )?;
@@ -481,8 +465,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&self, &self,
seen_timestamp: Duration, seen_timestamp: Duration,
cx: &SyncNetworkContext<T>, cx: &SyncNetworkContext<T>,
delay_send: bool, process_type: BlockProcessType,
chain_hash: Option<Hash256>,
verified_response: R::VerifiedResponseType, verified_response: R::VerifiedResponseType,
lookup: &mut SingleBlockLookup<L, T>, lookup: &mut SingleBlockLookup<L, T>,
) -> Result<(), LookupRequestError> { ) -> Result<(), LookupRequestError> {
@@ -494,15 +477,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.component_downloaded = true; .component_downloaded = true;
let cached_child = lookup.add_response::<R>(verified_response.clone()); let cached_child = lookup.add_response::<R>(verified_response.clone());
match cached_child { match cached_child {
CachedChild::Ok(block) => { CachedChild::Ok(block) => {
if let Some(chain_hash) = chain_hash { // If we have an outstanding parent request for this block, delay sending the response until
if !delay_send { // all parent blocks have been processed, otherwise we will fail validation with an
let process_type = match L::lookup_type() { // `UnknownParent`.
LookupType::Parent => BlockProcessType::ParentLookup { chain_hash }, let delay_send = match L::lookup_type() {
LookupType::Current => BlockProcessType::SingleBlock { id }, LookupType::Parent => false,
LookupType::Current => self.has_pending_parent_request(lookup.block_root()),
}; };
if !delay_send {
self.send_block_for_processing( self.send_block_for_processing(
block_root, block_root,
block, block,
@@ -512,7 +497,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
)? )?
} }
} }
}
CachedChild::DownloadIncomplete => { CachedChild::DownloadIncomplete => {
// If this was the result of a block request, we can't determined if the block peer // If this was the result of a block request, we can't determined if the block peer
// did anything wrong. If we already had both a block and blobs response processed, // did anything wrong. If we already had both a block and blobs response processed,
@@ -628,8 +612,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.handle_verified_response::<Parent, R>( self.handle_verified_response::<Parent, R>(
seen_timestamp, seen_timestamp,
cx, cx,
false, BlockProcessType::ParentLookup {
Some(parent_lookup.chain_hash()), chain_hash: parent_lookup.chain_hash(),
},
verified_response, verified_response,
&mut parent_lookup.current_parent_request, &mut parent_lookup.current_parent_request,
)?; )?;
@@ -1181,7 +1166,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"chain_hash" => ?chain_hash "chain_hash" => ?chain_hash
); );
child_lookup.handle_consistency_failure(cx); child_lookup.handle_consistency_failure(cx);
if child_lookup.request_block_and_blobs(cx).is_err() { if let Err(e) = child_lookup.request_block_and_blobs(cx) {
debug!(self.log,
"Failed to request block and blobs, dropping lookup";
"error" => ?e
);
self.single_block_lookups.remove(&child_lookup_id); self.single_block_lookups.remove(&child_lookup_id);
} }
} }
@@ -1313,7 +1302,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"error" => ?e "error" => ?e
); );
lookup.handle_consistency_failure(cx); lookup.handle_consistency_failure(cx);
if lookup.request_block_and_blobs(cx).is_err() { if let Err(e) = lookup.request_block_and_blobs(cx) {
debug!(self.log,
"Failed to request block and blobs, droppingn lookup";
"error" => ?e
);
self.single_block_lookups.remove(&id); self.single_block_lookups.remove(&id);
} }
} }