process single block and blob

This commit is contained in:
realbigsean
2022-11-30 11:51:18 -05:00
parent fc9d0a512d
commit 2157d91b43
14 changed files with 179 additions and 209 deletions

View File

@@ -1,10 +1,11 @@
use crate::sync::manager::{BlockTy, Id};
use crate::sync::manager::Id;
use lighthouse_network::rpc::methods::BlocksByRangeRequest;
use lighthouse_network::PeerId;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::ops::Sub;
use std::sync::Arc;
use types::signed_block_and_blobs::BlockWrapper;
use types::{Epoch, EthSpec, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot};
/// The number of times to retry a batch before it is considered failed.
@@ -19,6 +20,21 @@ pub enum BatchTy<T: EthSpec> {
BlocksAndBlobs(Vec<SignedBeaconBlockAndBlobsSidecar<T>>),
}
impl<T: EthSpec> BatchTy<T> {
pub fn into_wrapped_blocks(self) -> Vec<BlockWrapper<T>> {
match self {
BatchTy::Blocks(blocks) => blocks
.into_iter()
.map(|block| BlockWrapper::Block { block })
.collect(),
BatchTy::BlocksAndBlobs(block_sidecar_pair) => block_sidecar_pair
.into_iter()
.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair })
.collect(),
}
}
}
/// Error representing a batch with mixed block types.
#[derive(Debug)]
pub struct MixedBlockTyErr;
@@ -63,7 +79,7 @@ pub trait BatchConfig {
/// Note that simpler hashing functions considered in the past (hash of first block, hash of last
/// block, number of received blocks) are not good enough to differentiate attempts. For this
/// reason, we hash the complete set of blocks both in RangeSync and BackFillSync.
fn batch_attempt_hash<T: EthSpec>(blocks: &[BlockTy<T>]) -> u64;
fn batch_attempt_hash<T: EthSpec>(blocks: &[BlockWrapper<T>]) -> u64;
}
pub struct RangeSyncBatchConfig {}
@@ -75,7 +91,7 @@ impl BatchConfig for RangeSyncBatchConfig {
fn max_batch_processing_attempts() -> u8 {
MAX_BATCH_PROCESSING_ATTEMPTS
}
fn batch_attempt_hash<T: EthSpec>(blocks: &[BlockTy<T>]) -> u64 {
fn batch_attempt_hash<T: EthSpec>(blocks: &[BlockWrapper<T>]) -> u64 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
blocks.hash(&mut hasher);
hasher.finish()
@@ -123,9 +139,9 @@ pub enum BatchState<T: EthSpec> {
/// The batch has failed either downloading or processing, but can be requested again.
AwaitingDownload,
/// The batch is being downloaded.
Downloading(PeerId, Vec<BlockTy<T>>, Id),
Downloading(PeerId, Vec<BlockWrapper<T>>, Id),
/// The batch has been completely downloaded and is ready for processing.
AwaitingProcessing(PeerId, Vec<BlockTy<T>>),
AwaitingProcessing(PeerId, Vec<BlockWrapper<T>>),
/// The batch is being processed.
Processing(Attempt),
/// The batch was successfully processed and is waiting to be validated.
@@ -258,7 +274,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
}
/// Adds a block to a downloading batch.
pub fn add_block(&mut self, block: BlockTy<T>) -> Result<(), WrongState> {
pub fn add_block(&mut self, block: BlockWrapper<T>) -> Result<(), WrongState> {
match self.state.poison() {
BatchState::Downloading(peer, mut blocks, req_id) => {
blocks.push(block);
@@ -397,7 +413,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
match self.batch_type {
ExpectedBatchTy::OnlyBlockBlobs => {
let blocks = blocks.into_iter().map(|block| {
let BlockTy::BlockAndBlob { block_sidecar_pair: block_and_blob } = block else {
let BlockWrapper::BlockAndBlob { block_sidecar_pair: block_and_blob } = block else {
panic!("Batches should never have a mixed type. This is a bug. Contact D")
};
block_and_blob
@@ -406,7 +422,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
}
ExpectedBatchTy::OnlyBlock => {
let blocks = blocks.into_iter().map(|block| {
let BlockTy::Block { block } = block else {
let BlockWrapper::Block { block } = block else {
panic!("Batches should never have a mixed type. This is a bug. Contact D")
};
block
@@ -507,7 +523,7 @@ pub struct Attempt {
}
impl Attempt {
fn new<B: BatchConfig, T: EthSpec>(peer_id: PeerId, blocks: &[BlockTy<T>]) -> Self {
fn new<B: BatchConfig, T: EthSpec>(peer_id: PeerId, blocks: &[BlockWrapper<T>]) -> Self {
let hash = B::batch_attempt_hash(blocks);
Attempt { peer_id, hash }
}

View File

@@ -1,7 +1,6 @@
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use super::BatchTy;
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
use crate::sync::manager::BlockTy;
use crate::sync::{
manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult,
};
@@ -12,6 +11,7 @@ use rand::seq::SliceRandom;
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use std::hash::{Hash, Hasher};
use types::signed_block_and_blobs::BlockWrapper;
use types::{Epoch, EthSpec, Hash256, Slot};
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
@@ -226,7 +226,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch_id: BatchId,
peer_id: &PeerId,
request_id: Id,
beacon_block: Option<BlockTy<T::EthSpec>>,
beacon_block: Option<BlockWrapper<T::EthSpec>>,
) -> ProcessingResult {
// check if we have this batch
let batch = match self.batches.get_mut(&batch_id) {
@@ -327,12 +327,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized);
self.current_processing_batch = Some(batch_id);
let work_event = match blocks {
BatchTy::Blocks(blocks) => BeaconWorkEvent::chain_segment(process_id, blocks),
BatchTy::BlocksAndBlobs(blocks_and_blobs) => {
BeaconWorkEvent::blob_chain_segment(process_id, blocks_and_blobs)
}
};
let work_event = BeaconWorkEvent::chain_segment(process_id, blocks.into_wrapped_blocks());
if let Err(e) = beacon_processor_send.try_send(work_event) {
crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch",

View File

@@ -44,7 +44,7 @@ use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain};
use super::chain_collection::ChainCollection;
use super::sync_type::RangeSyncType;
use crate::status::ToStatusMessage;
use crate::sync::manager::{BlockTy, Id};
use crate::sync::manager::Id;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::BatchProcessResult;
use beacon_chain::{BeaconChain, BeaconChainTypes};
@@ -55,6 +55,7 @@ use lru_cache::LRUTimeCache;
use slog::{crit, debug, trace, warn};
use std::collections::HashMap;
use std::sync::Arc;
use types::signed_block_and_blobs::BlockWrapper;
use types::{Epoch, EthSpec, Hash256, Slot};
/// For how long we store failed finalized chains to prevent retries.
@@ -202,7 +203,7 @@ where
chain_id: ChainId,
batch_id: BatchId,
request_id: Id,
beacon_block: Option<BlockTy<T::EthSpec>>,
beacon_block: Option<BlockWrapper<T::EthSpec>>,
) {
// check if this chunk removes the chain
match self.chains.call_by_id(chain_id, |chain| {