chain segment processing for blobs

This commit is contained in:
realbigsean
2022-11-30 09:40:15 -05:00
parent 8b56446b64
commit 422d145902
8 changed files with 136 additions and 91 deletions

View File

@@ -61,6 +61,7 @@ use std::time::Duration;
use std::{cmp, collections::HashSet};
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use types::signed_block_and_blobs::BlockWrapper;
use types::{
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedBlsToExecutionChange,
@@ -1762,8 +1763,13 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
/*
* Verification for a chain segment (multiple blocks).
*/
Work::ChainSegment { process_id, blocks } => task_spawner
.spawn_async(async move { worker.process_chain_segment(process_id, blocks).await }),
Work::ChainSegment { process_id, blocks } => task_spawner.spawn_async(async move {
let wrapped = blocks
.into_iter()
.map(|block| BlockWrapper::Block { block })
.collect();
worker.process_chain_segment(process_id, wrapped).await
}),
/*
* Processing of Status Messages.
*/
@@ -1867,9 +1873,13 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
process_id,
blocks_and_blobs,
} => task_spawner.spawn_async(async move {
worker
.process_blob_chain_segment(process_id, blocks_and_blobs)
.await
let wrapped = blocks_and_blobs
.into_iter()
.map(|b| BlockWrapper::BlockAndBlob {
block_sidecar_pair: b,
})
.collect();
worker.process_chain_segment(process_id, wrapped).await
}),
};
}

View File

@@ -15,7 +15,11 @@ use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{Epoch, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar};
use types::signed_block_and_blobs::BlockWrapper;
use types::{
Epoch, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar,
SignedBeaconBlockAndBlobsSidecarDecode,
};
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
@@ -126,7 +130,7 @@ impl<T: BeaconChainTypes> Worker<T> {
pub async fn process_chain_segment(
&self,
sync_type: ChainSegmentProcessId,
downloaded_blocks: Vec<Arc<SignedBeaconBlock<T::EthSpec>>>,
downloaded_blocks: Vec<BlockWrapper<T::EthSpec>>,
) {
let result = match sync_type {
// this a request from the range sync
@@ -176,7 +180,18 @@ impl<T: BeaconChainTypes> Worker<T> {
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
let sent_blocks = downloaded_blocks.len();
match self.process_backfill_blocks(downloaded_blocks) {
let unwrapped = downloaded_blocks
.into_iter()
.map(|block| match block {
BlockWrapper::Block { block } => block,
//FIXME(sean) handle blobs in backfill
BlockWrapper::BlockAndBlob {
block_sidecar_pair: _,
} => todo!(),
})
.collect();
match self.process_backfill_blocks(unwrapped) {
(_, Ok(_)) => {
debug!(self.log, "Backfill batch processed";
"batch_epoch" => epoch,
@@ -241,24 +256,13 @@ impl<T: BeaconChainTypes> Worker<T> {
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
}
pub async fn process_blob_chain_segment(
&self,
sync_type: ChainSegmentProcessId,
downloaded_blocks: Vec<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
) {
warn!(self.log, "FAKE PROCESSING A BLOBS SEGMENT!!!");
let result = BatchProcessResult::Success {
was_non_empty: !downloaded_blocks.is_empty(),
};
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
}
/// Helper function to process blocks batches which only consumes the chain and blocks to process.
async fn process_blocks<'a>(
&self,
downloaded_blocks: impl Iterator<Item = &'a Arc<SignedBeaconBlock<T::EthSpec>>>,
downloaded_blocks: impl Iterator<Item = &'a BlockWrapper<T::EthSpec>>,
count_unrealized: CountUnrealized,
) -> (usize, Result<(), ChainSegmentFailed>) {
let blocks: Vec<Arc<_>> = downloaded_blocks.cloned().collect();
let blocks: Vec<_> = downloaded_blocks.cloned().collect();
match self
.chain
.process_chain_segment(blocks, count_unrealized)