deal with rpc blobs in groups per block in the da checker. don't cache missing blob ids in the da checker.

This commit is contained in:
realbigsean
2023-04-04 10:29:07 -04:00
parent 22694d1c89
commit 6f12df37cf
8 changed files with 247 additions and 79 deletions

View File

@@ -1,5 +1,5 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
@@ -19,6 +19,7 @@ use types::{BlobSidecar, SignedBeaconBlock};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
use crate::metrics;
use crate::sync::block_lookups::single_block_lookup::SingleBlobRequest;
use self::parent_lookup::PARENT_FAIL_TOLERANCE;
use self::{
@@ -59,6 +60,8 @@ pub(crate) struct BlockLookups<T: BeaconChainTypes> {
/// The flag allows us to determine if the peer returned data or sent us nothing.
single_block_lookups: FnvHashMap<Id, SingleBlockRequest<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS>>,
single_blob_lookups: FnvHashMap<Id, SingleBlobRequest<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS>>,
/// The logger for the import manager.
log: Logger,
}
@@ -72,6 +75,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
FAILED_CHAINS_CACHE_EXPIRY_SECONDS,
)),
single_block_lookups: Default::default(),
single_blob_lookups: Default::default(),
log,
}
}
@@ -137,6 +141,56 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx: &mut SyncNetworkContext<T>,
) {
todo!()
//
// let hash = Hash256::zero();
//
// // Do not re-request a blo that is already being requested
// if self
// .single_blob_lookups
// .values_mut()
// .any(|single_block_request| single_block_request.add_peer(&hash, &peer_id))
// {
// return;
// }
//
// if self.parent_lookups.iter_mut().any(|parent_req| {
// parent_req.add_peer(&hash, &peer_id) || parent_req.contains_block(&hash)
// }) {
// // If the block was already downloaded, or is being downloaded in this moment, do not
// // request it.
// return;
// }
//
// if self
// .processing_parent_lookups
// .values()
// .any(|(hashes, _last_parent_request)| hashes.contains(&hash))
// {
// // we are already processing this block, ignore it.
// return;
// }
//
// debug!(
// self.log,
// "Searching for block";
// "peer_id" => %peer_id,
// "block" => %hash
// );
//
// let mut single_block_request = SingleBlobRequest::new(hash, peer_id);
//
// let (peer_id, request) = single_block_request
// .request_block()
// .expect("none of the possible failure cases apply for a newly created block lookup");
// if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) {
// self.single_blob_lookups
// .insert(request_id, single_block_request);
//
// metrics::set_gauge(
// &metrics::SYNC_SINGLE_BLOB_LOOKUPS,
// self.single_blob_lookups.len() as i64,
// );
}
pub fn search_block_delayed(
@@ -171,6 +225,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
) {
//
// let missing_ids = cx.chain.data_availability_checker.get_missing_blob_ids(block, Some(root));
// // TODO(sean) how do we handle this erroring?
// if let Ok(missing_ids) = missing_ids {
// self.search_blobs(missing_ids, peer_id, cx);
// }
let parent_root = block.parent_root();
// If this block or it's parent is part of a known failed chain, ignore it.
if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) {
@@ -497,19 +558,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};
match result {
BlockProcessResult::Ok(status) => {
match status {
AvailabilityProcessingStatus::Imported(hash) => {
trace!(self.log, "Single block processing succeeded"; "block" => %root);
}
AvailabilityProcessingStatus::PendingBlobs(blobs) => {
// trigger?
}
AvailabilityProcessingStatus::PendingBlock(hash) => {
// logic error
}
BlockProcessResult::Ok(status) => match status {
AvailabilityProcessingStatus::Imported(hash) => {
trace!(self.log, "Single block processing succeeded"; "block" => %root);
}
}
AvailabilityProcessingStatus::PendingBlobs(blobs_ids) => {
self.search_blobs(blobs_ids, peer_id, cx);
}
AvailabilityProcessingStatus::PendingBlock(hash) => {
warn!(self.log, "Block processed but returned PendingBlock"; "block" => %hash);
}
},
BlockProcessResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
// This implies that the cpu is overloaded. Drop the request.
@@ -620,13 +679,18 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlock(_)) => {
// doesn't make sense
}
BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlobs(blobs)) => {
// trigger
BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlobs(blobs_ids)) => {
self.search_blobs(blobs_ids, peer_id, cx);
}
BlockProcessResult::Err(BlockError::ParentUnknown(block)) => {
// need to keep looking for parents
// add the block back to the queue and continue the search
// TODO(sean) how do we handle this erroring?
let missing_ids = cx
.chain
.data_availability_checker
.get_missing_blob_ids(block.clone(), None)
.unwrap_or_default();
parent_lookup.add_block(block);
self.search_blobs(missing_ids, peer_id, cx);
self.request_parent(parent_lookup, cx);
}
BlockProcessResult::Ok(AvailabilityProcessingStatus::Imported(_))