This commit is contained in:
Diva M
2022-11-28 14:13:12 -05:00
parent f4babdedd5
commit 805df307f6
7 changed files with 157 additions and 72 deletions

View File

@@ -18,7 +18,7 @@ use self::{
single_block_lookup::SingleBlockRequest,
};
use super::manager::BlockProcessResult;
use super::manager::{BlockProcessResult, BlockTy};
use super::BatchProcessResult;
use super::{
manager::{BlockProcessType, Id},
@@ -30,7 +30,7 @@ mod single_block_lookup;
#[cfg(test)]
mod tests;
pub type RootBlockTuple<T> = (Hash256, Arc<SignedBeaconBlock<T>>);
pub type RootBlockTuple<T> = (Hash256, BlockTy<T>);
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
@@ -87,8 +87,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let mut single_block_request = SingleBlockRequest::new(hash, peer_id);
//FIXME(sean) remove unwrap?
let (peer_id, request) = single_block_request.request_block().unwrap();
let (peer_id, request) = single_block_request
.request_block()
.expect("none of the possible failure cases apply for a newly created block lookup");
if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) {
self.single_block_lookups
.insert(request_id, single_block_request);
@@ -105,7 +106,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn search_parent(
&mut self,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
block: BlockTy<T::EthSpec>,
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
) {
@@ -120,7 +121,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// Make sure this block is not already downloaded, and that neither it or its parent is
// being searched for.
if self.parent_queue.iter_mut().any(|parent_req| {
parent_req.contains_block(&block)
parent_req.contains_block(block.block())
|| parent_req.add_peer(&block_root, &peer_id)
|| parent_req.add_peer(&parent_root, &peer_id)
}) {
@@ -138,7 +139,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
id: Id,
peer_id: PeerId,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
block: Option<BlockTy<T::EthSpec>>,
seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T>,
) {
@@ -203,7 +204,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
id: Id,
peer_id: PeerId,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
block: Option<BlockTy<T::EthSpec>>,
seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T>,
) {
@@ -425,7 +426,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e);
}
BlockError::ParentUnknown(block) => {
self.search_parent(root, block, peer_id, cx);
self.search_parent(root, BlockTy::Block { block }, peer_id, cx);
}
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
@@ -505,7 +506,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockProcessResult::Err(BlockError::ParentUnknown(block)) => {
// need to keep looking for parents
// add the block back to the queue and continue the search
parent_lookup.add_block(block);
parent_lookup.add_block(BlockTy::Block { block });
self.request_parent(parent_lookup, cx);
}
BlockProcessResult::Ok
@@ -524,8 +525,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let chain_hash = parent_lookup.chain_hash();
let blocks = parent_lookup.chain_blocks();
let process_id = ChainSegmentProcessId::ParentLookup(chain_hash);
// let work = WorkEvent::chain_segment(process_id, blocks);
let work = todo!("this means we can have batches of mixed type");
match beacon_processor_send.try_send(WorkEvent::chain_segment(process_id, blocks)) {
match beacon_processor_send.try_send(work) {
Ok(_) => {
self.parent_queue.push(parent_lookup);
}
@@ -631,7 +634,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
fn send_block_for_processing(
&mut self,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
block: BlockTy<T::EthSpec>,
duration: Duration,
process_type: BlockProcessType,
cx: &mut SyncNetworkContext<T>,
@@ -639,7 +642,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
match cx.processor_channel_if_enabled() {
Some(beacon_processor_send) => {
trace!(self.log, "Sending block for processing"; "block" => ?block_root, "process" => ?process_type);
let event = WorkEvent::rpc_beacon_block(block_root, block, duration, process_type);
let event = match block {
BlockTy::Block { block } => {
WorkEvent::rpc_beacon_block(block_root, block, duration, process_type)
}
BlockTy::BlockAndBlob { block_sidecar_pair } => {
// WorkEvent::rpc_block_and_glob(block_sidecar_pair)
todo!("we also need to process block-glob pairs for rpc")
}
};
if let Err(e) = beacon_processor_send.try_send(event) {
error!(
self.log,