Revert "renames, remove , wrap BlockWrapper enum to make descontruction private"

This reverts commit 5b3b34a9d7.
This commit is contained in:
realbigsean
2022-12-28 10:30:36 -05:00
parent 5b3b34a9d7
commit 1931a442dc
19 changed files with 268 additions and 231 deletions

View File

@@ -1699,7 +1699,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
message_id,
peer_id,
peer_client,
block.into(),
BlockWrapper::Block(block),
work_reprocessing_tx,
duplicate_cache,
seen_timestamp,
@@ -1721,7 +1721,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
message_id,
peer_id,
peer_client,
block_sidecar_pair.into(),
BlockWrapper::BlockAndBlob(block_sidecar_pair),
work_reprocessing_tx,
duplicate_cache,
seen_timestamp,

View File

@@ -230,10 +230,10 @@ impl<T: BeaconChainTypes> Worker<T> {
Ok((Some(block), Some(blobs))) => {
self.send_response(
peer_id,
Response::BlobsByRoot(Some(SignedBeaconBlockAndBlobsSidecar {
Response::BlobsByRoot(Some(Arc::new(SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
})),
}))),
request_id,
);
send_block_count += 1;

View File

@@ -188,7 +188,14 @@ impl<T: BeaconChainTypes> Worker<T> {
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
let sent_blocks = downloaded_blocks.len();
let unwrapped = downloaded_blocks.into_iter().map(|_| todo!()).collect();
let unwrapped = downloaded_blocks
.into_iter()
.map(|block| match block {
BlockWrapper::Block(block) => block,
//FIXME(sean) handle blobs in backfill
BlockWrapper::BlockAndBlob(_) => todo!(),
})
.collect();
match self.process_backfill_blocks(unwrapped) {
(_, Ok(_)) => {

View File

@@ -223,10 +223,10 @@ impl<T: BeaconChainTypes> Processor<T> {
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
unreachable!("Block lookups do not request BBRange requests")
}
id @ (SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::BackFillBlobs { .. }
| SyncId::RangeBlobs { .. }) => id,
id @ (SyncId::BackFillSync { .. }
| SyncId::RangeSync { .. }
| SyncId::BackFillSidecarPair { .. }
| SyncId::RangeSidecarPair { .. }) => id,
},
RequestId::Router => unreachable!("All BBRange requests belong to sync"),
};
@@ -258,7 +258,7 @@ impl<T: BeaconChainTypes> Processor<T> {
);
if let RequestId::Sync(id) = request_id {
self.send_to_sync(SyncMessage::RpcBlobs {
self.send_to_sync(SyncMessage::RpcGlob {
peer_id,
request_id: id,
blob_sidecar,
@@ -282,10 +282,10 @@ impl<T: BeaconChainTypes> Processor<T> {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlobs { .. }
| SyncId::BackFillBlobs { .. } => {
SyncId::BackFillSync { .. }
| SyncId::RangeSync { .. }
| SyncId::RangeSidecarPair { .. }
| SyncId::BackFillSidecarPair { .. } => {
unreachable!("Batch syncing do not request BBRoot requests")
}
},
@@ -310,15 +310,15 @@ impl<T: BeaconChainTypes> Processor<T> {
&mut self,
peer_id: PeerId,
request_id: RequestId,
block_and_blobs: Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
block_and_blobs: Option<Arc<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>>,
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlobs { .. }
| SyncId::BackFillBlobs { .. } => {
SyncId::BackFillSync { .. }
| SyncId::RangeSync { .. }
| SyncId::RangeSidecarPair { .. }
| SyncId::BackFillSidecarPair { .. } => {
unreachable!("Batch syncing does not request BBRoot requests")
}
},
@@ -330,7 +330,7 @@ impl<T: BeaconChainTypes> Processor<T> {
"Received BlockAndBlobssByRoot Response";
"peer" => %peer_id,
);
self.send_to_sync(SyncMessage::RpcBlockAndBlobs {
self.send_to_sync(SyncMessage::RpcBlockAndGlob {
peer_id,
request_id,
block_and_blobs,

View File

@@ -536,7 +536,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id);
self.current_processing_batch = Some(batch_id);
let work_event = BeaconWorkEvent::chain_segment(process_id, blocks);
let work_event = BeaconWorkEvent::chain_segment(process_id, blocks.into_wrapped_blocks());
if let Err(e) = network.processor_channel().try_send(work_event) {
crit!(self.log, "Failed to send backfill segment to processor."; "msg" => "process_batch",
"error" => %e, "batch" => self.processing_target);

View File

@@ -1,9 +1,12 @@
use std::{collections::VecDeque, sync::Arc};
use types::{signed_block_and_blobs::BlockWrapper, BlobsSidecar, EthSpec, SignedBeaconBlock};
use types::{
signed_block_and_blobs::BlockWrapper, BlobsSidecar, EthSpec, SignedBeaconBlock,
SignedBeaconBlockAndBlobsSidecar,
};
#[derive(Debug, Default)]
pub struct BlocksAndBlobsRequestInfo<T: EthSpec> {
pub struct BlockBlobRequestInfo<T: EthSpec> {
/// Blocks we have received awaiting for their corresponding sidecar.
accumulated_blocks: VecDeque<Arc<SignedBeaconBlock<T>>>,
/// Sidecars we have received awaiting for their corresponding block.
@@ -14,7 +17,7 @@ pub struct BlocksAndBlobsRequestInfo<T: EthSpec> {
is_sidecars_stream_terminated: bool,
}
impl<T: EthSpec> BlocksAndBlobsRequestInfo<T> {
impl<T: EthSpec> BlockBlobRequestInfo<T> {
pub fn add_block_response(&mut self, maybe_block: Option<Arc<SignedBeaconBlock<T>>>) {
match maybe_block {
Some(block) => self.accumulated_blocks.push_back(block),
@@ -30,7 +33,7 @@ impl<T: EthSpec> BlocksAndBlobsRequestInfo<T> {
}
pub fn into_responses(self) -> Result<Vec<BlockWrapper<T>>, &'static str> {
let BlocksAndBlobsRequestInfo {
let BlockBlobRequestInfo {
accumulated_blocks,
mut accumulated_sidecars,
..
@@ -48,9 +51,14 @@ impl<T: EthSpec> BlocksAndBlobsRequestInfo<T> {
{
let blobs_sidecar =
accumulated_sidecars.pop_front().ok_or("missing sidecar")?;
Ok(BlockWrapper::new_with_blobs(beacon_block, blobs_sidecar))
Ok(BlockWrapper::BlockAndBlob(
SignedBeaconBlockAndBlobsSidecar {
beacon_block,
blobs_sidecar,
},
))
} else {
Ok(beacon_block.into())
Ok(BlockWrapper::Block(beacon_block))
}
})
.collect::<Result<Vec<_>, _>>();

View File

@@ -35,13 +35,13 @@
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::block_lookups::BlockLookups;
use super::network_context::{BlockOrBlobs, SyncNetworkContext};
use super::network_context::{BlockOrBlob, SyncNetworkContext};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::range_sync::ByRangeRequestType;
use crate::sync::range_sync::ExpectedBatchTy;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
use futures::StreamExt;
use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
@@ -79,13 +79,13 @@ pub enum RequestId {
/// Request searching for a block's parent. The id is the chain
ParentLookup { id: Id },
/// Request was from the backfill sync algorithm.
BackFillBlocks { id: Id },
/// Backfill request for blob sidecars.
BackFillBlobs { id: Id },
BackFillSync { id: Id },
/// Backfill request for blocks and sidecars.
BackFillSidecarPair { id: Id },
/// The request was from a chain in the range sync algorithm.
RangeBlocks { id: Id },
/// The request was from a chain in range, asking for ranges blob sidecars.
RangeBlobs { id: Id },
RangeSync { id: Id },
/// The request was from a chain in range, asking for ranges of blocks and sidecars.
RangeSidecarPair { id: Id },
}
#[derive(Debug)]
@@ -103,7 +103,7 @@ pub enum SyncMessage<T: EthSpec> {
},
/// A blob has been received from the RPC.
RpcBlobs {
RpcGlob {
request_id: RequestId,
peer_id: PeerId,
blob_sidecar: Option<Arc<BlobsSidecar<T>>>,
@@ -111,10 +111,10 @@ pub enum SyncMessage<T: EthSpec> {
},
/// A block and blobs have been received from the RPC.
RpcBlockAndBlobs {
RpcBlockAndGlob {
request_id: RequestId,
peer_id: PeerId,
block_and_blobs: Option<SignedBeaconBlockAndBlobsSidecar<T>>,
block_and_blobs: Option<Arc<SignedBeaconBlockAndBlobsSidecar<T>>>,
seen_timestamp: Duration,
},
@@ -295,10 +295,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.block_lookups
.parent_lookup_failed(id, peer_id, &mut self.network, error);
}
RequestId::BackFillBlocks { id } => {
RequestId::BackFillSync { id } => {
if let Some(batch_id) = self
.network
.backfill_request_failed(id, ByRangeRequestType::Blocks)
.backfill_request_failed(id, ExpectedBatchTy::OnlyBlock)
{
match self
.backfill_sync
@@ -310,10 +310,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
RequestId::BackFillBlobs { id } => {
RequestId::BackFillSidecarPair { id } => {
if let Some(batch_id) = self
.network
.backfill_request_failed(id, ByRangeRequestType::BlocksAndBlobs)
.backfill_request_failed(id, ExpectedBatchTy::OnlyBlockBlobs)
{
match self
.backfill_sync
@@ -324,10 +324,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
}
RequestId::RangeBlocks { id } => {
RequestId::RangeSync { id } => {
if let Some((chain_id, batch_id)) = self
.network
.range_sync_request_failed(id, ByRangeRequestType::Blocks)
.range_sync_request_failed(id, ExpectedBatchTy::OnlyBlock)
{
self.range_sync.inject_error(
&mut self.network,
@@ -339,10 +339,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.update_sync_state()
}
}
RequestId::RangeBlobs { id } => {
RequestId::RangeSidecarPair { id } => {
if let Some((chain_id, batch_id)) = self
.network
.range_sync_request_failed(id, ByRangeRequestType::BlocksAndBlobs)
.range_sync_request_failed(id, ExpectedBatchTy::OnlyBlockBlobs)
{
self.range_sync.inject_error(
&mut self.network,
@@ -648,18 +648,18 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.block_lookups
.parent_chain_processed(chain_hash, result, &mut self.network),
},
SyncMessage::RpcBlobs {
SyncMessage::RpcGlob {
request_id,
peer_id,
blob_sidecar,
seen_timestamp,
} => self.rpc_blobs_received(request_id, peer_id, blob_sidecar, seen_timestamp),
SyncMessage::RpcBlockAndBlobs {
} => self.rpc_sidecar_received(request_id, peer_id, blob_sidecar, seen_timestamp),
SyncMessage::RpcBlockAndGlob {
request_id,
peer_id,
block_and_blobs,
seen_timestamp,
} => self.rpc_block_block_and_blobs_received(
} => self.rpc_block_sidecar_pair_received(
request_id,
peer_id,
block_and_blobs,
@@ -734,18 +734,18 @@ impl<T: BeaconChainTypes> SyncManager<T> {
RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response(
id,
peer_id,
beacon_block.map(|block| block.into()),
beacon_block.map(|block| BlockWrapper::Block(block)),
seen_timestamp,
&mut self.network,
),
RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response(
id,
peer_id,
beacon_block.map(|block| block.into()),
beacon_block.map(|block| BlockWrapper::Block(block)),
seen_timestamp,
&mut self.network,
),
RequestId::BackFillBlocks { id } => {
RequestId::BackFillSync { id } => {
let is_stream_terminator = beacon_block.is_none();
if let Some(batch_id) = self
.network
@@ -756,7 +756,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
batch_id,
&peer_id,
id,
beacon_block.map(|block| block.into()),
beacon_block.map(|block| BlockWrapper::Block(block)),
) {
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
Ok(ProcessResult::Successful) => {}
@@ -768,7 +768,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
}
RequestId::RangeBlocks { id } => {
RequestId::RangeSync { id } => {
let is_stream_terminator = beacon_block.is_none();
if let Some((chain_id, batch_id)) = self
.network
@@ -780,28 +780,28 @@ impl<T: BeaconChainTypes> SyncManager<T> {
chain_id,
batch_id,
id,
beacon_block.map(|block| block.into()),
beacon_block.map(|block| BlockWrapper::Block(block)),
);
self.update_sync_state();
}
}
RequestId::BackFillBlobs { id } => {
self.blobs_backfill_response(id, peer_id, beacon_block.into())
RequestId::BackFillSidecarPair { id } => {
self.block_blob_backfill_response(id, peer_id, beacon_block.into())
}
RequestId::RangeBlobs { id } => {
self.blobs_range_response(id, peer_id, beacon_block.into())
RequestId::RangeSidecarPair { id } => {
self.block_blob_range_response(id, peer_id, beacon_block.into())
}
}
}
/// Handles receiving a response for a range sync request that should have both blocks and
/// blobs.
fn blobs_range_response(
fn block_blob_range_response(
&mut self,
id: Id,
peer_id: PeerId,
block_or_blob: BlockOrBlobs<T::EthSpec>,
block_or_blob: BlockOrBlob<T::EthSpec>,
) {
if let Some((chain_id, batch_id, block_responses)) = self
.network
@@ -834,7 +834,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"peer_id" => %peer_id, "batch_id" => batch_id, "error" => e
);
// TODO: penalize the peer for being a bad boy
let id = RequestId::RangeBlobs { id };
let id = RequestId::RangeSidecarPair { id };
self.inject_error(peer_id, id, RPCError::InvalidData(e.into()))
}
}
@@ -843,11 +843,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// Handles receiving a response for a Backfill sync request that should have both blocks and
/// blobs.
fn blobs_backfill_response(
fn block_blob_backfill_response(
&mut self,
id: Id,
peer_id: PeerId,
block_or_blob: BlockOrBlobs<T::EthSpec>,
block_or_blob: BlockOrBlob<T::EthSpec>,
) {
if let Some((batch_id, block_responses)) = self
.network
@@ -886,14 +886,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"peer_id" => %peer_id, "batch_id" => batch_id, "error" => e
);
// TODO: penalize the peer for being a bad boy
let id = RequestId::BackFillBlobs { id };
let id = RequestId::BackFillSidecarPair { id };
self.inject_error(peer_id, id, RPCError::InvalidData(e.into()))
}
}
}
}
fn rpc_blobs_received(
fn rpc_sidecar_received(
&mut self,
request_id: RequestId,
peer_id: PeerId,
@@ -904,47 +904,57 @@ impl<T: BeaconChainTypes> SyncManager<T> {
RequestId::SingleBlock { .. } | RequestId::ParentLookup { .. } => {
unreachable!("There is no such thing as a singular 'by root' glob request that is not accompanied by the block")
}
RequestId::BackFillBlocks { .. } => {
RequestId::BackFillSync { .. } => {
unreachable!("An only blocks request does not receive sidecars")
}
RequestId::BackFillBlobs { id } => {
self.blobs_backfill_response(id, peer_id, maybe_sidecar.into())
RequestId::BackFillSidecarPair { id } => {
self.block_blob_backfill_response(id, peer_id, maybe_sidecar.into())
}
RequestId::RangeBlocks { .. } => {
RequestId::RangeSync { .. } => {
unreachable!("Only-blocks range requests don't receive sidecars")
}
RequestId::RangeBlobs { id } => {
self.blobs_range_response(id, peer_id, maybe_sidecar.into())
RequestId::RangeSidecarPair { id } => {
self.block_blob_range_response(id, peer_id, maybe_sidecar.into())
}
}
}
fn rpc_block_block_and_blobs_received(
fn rpc_block_sidecar_pair_received(
&mut self,
request_id: RequestId,
peer_id: PeerId,
block_sidecar_pair: Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
block_sidecar_pair: Option<Arc<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
) {
match request_id {
RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response(
id,
peer_id,
block_sidecar_pair.map(|block_sidecar_pair| block_sidecar_pair.into()),
block_sidecar_pair.map(|block_sidecar_pair| {
BlockWrapper::BlockAndBlob(
// TODO: why is this in an arc
(*block_sidecar_pair).clone(),
)
}),
seen_timestamp,
&mut self.network,
),
RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response(
id,
peer_id,
block_sidecar_pair.map(|block_sidecar_pair| block_sidecar_pair.into()),
block_sidecar_pair.map(|block_sidecar_pair| {
BlockWrapper::BlockAndBlob(
// TODO: why is this in an arc
(*block_sidecar_pair).clone(),
)
}),
seen_timestamp,
&mut self.network,
),
RequestId::BackFillBlocks { .. }
| RequestId::BackFillBlobs { .. }
| RequestId::RangeBlocks { .. }
| RequestId::RangeBlobs { .. } => unreachable!(
RequestId::BackFillSync { .. }
| RequestId::BackFillSidecarPair { .. }
| RequestId::RangeSync { .. }
| RequestId::RangeSidecarPair { .. } => unreachable!(
"since range requests are not block-glob coupled, this should never be reachable"
),
}

View File

@@ -1,9 +1,9 @@
//! Provides network functionality for the Syncing thread. This fundamentally wraps a network
//! channel and stores a global RPC ID to perform requests.
use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo;
use super::block_sidecar_coupling::BlockBlobRequestInfo;
use super::manager::{Id, RequestId as SyncRequestId};
use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
use super::range_sync::{BatchId, ChainId, ExpectedBatchTy};
use crate::beacon_processor::WorkEvent;
use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage;
@@ -38,12 +38,11 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
backfill_requests: FnvHashMap<Id, BatchId>,
/// BlocksByRange requests paired with BlobsByRange requests made by the range.
range_blocks_and_blobs_requests:
FnvHashMap<Id, (ChainId, BatchId, BlocksAndBlobsRequestInfo<T::EthSpec>)>,
range_sidecar_pair_requests:
FnvHashMap<Id, (ChainId, BatchId, BlockBlobRequestInfo<T::EthSpec>)>,
/// BlocksByRange requests paired with BlobsByRange requests made by the backfill sync.
backfill_blocks_and_blobs_requests:
FnvHashMap<Id, (BatchId, BlocksAndBlobsRequestInfo<T::EthSpec>)>,
backfill_sidecar_pair_requests: FnvHashMap<Id, (BatchId, BlockBlobRequestInfo<T::EthSpec>)>,
/// Whether the ee is online. If it's not, we don't allow access to the
/// `beacon_processor_send`.
@@ -59,20 +58,20 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
}
/// Small enumeration to make dealing with block and blob requests easier.
pub enum BlockOrBlobs<T: EthSpec> {
pub enum BlockOrBlob<T: EthSpec> {
Block(Option<Arc<SignedBeaconBlock<T>>>),
Blobs(Option<Arc<BlobsSidecar<T>>>),
Blob(Option<Arc<BlobsSidecar<T>>>),
}
impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlobs<T> {
impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlob<T> {
fn from(block: Option<Arc<SignedBeaconBlock<T>>>) -> Self {
BlockOrBlobs::Block(block)
BlockOrBlob::Block(block)
}
}
impl<T: EthSpec> From<Option<Arc<BlobsSidecar<T>>>> for BlockOrBlobs<T> {
impl<T: EthSpec> From<Option<Arc<BlobsSidecar<T>>>> for BlockOrBlob<T> {
fn from(blob: Option<Arc<BlobsSidecar<T>>>) -> Self {
BlockOrBlobs::Blobs(blob)
BlockOrBlob::Blob(blob)
}
}
@@ -90,8 +89,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: 1,
range_requests: Default::default(),
backfill_requests: Default::default(),
range_blocks_and_blobs_requests: Default::default(),
backfill_blocks_and_blobs_requests: Default::default(),
range_sidecar_pair_requests: Default::default(),
backfill_sidecar_pair_requests: Default::default(),
execution_engine_state: EngineState::Online, // always assume `Online` at the start
beacon_processor_send,
chain,
@@ -141,13 +140,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn blocks_by_range_request(
&mut self,
peer_id: PeerId,
batch_type: ByRangeRequestType,
batch_type: ExpectedBatchTy,
request: BlocksByRangeRequest,
chain_id: ChainId,
batch_id: BatchId,
) -> Result<Id, &'static str> {
match batch_type {
ByRangeRequestType::Blocks => {
ExpectedBatchTy::OnlyBlock => {
trace!(
self.log,
"Sending BlocksByRange request";
@@ -157,7 +156,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
);
let request = Request::BlocksByRange(request);
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::RangeBlocks { id });
let request_id = RequestId::Sync(SyncRequestId::RangeSync { id });
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request,
@@ -166,7 +165,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.range_requests.insert(id, (chain_id, batch_id));
Ok(id)
}
ByRangeRequestType::BlocksAndBlobs => {
ExpectedBatchTy::OnlyBlockBlobs => {
debug!(
self.log,
"Sending BlocksByRange and BlobsByRange requests";
@@ -177,7 +176,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// create the shared request id. This is fine since the rpc handles substream ids.
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::RangeBlobs { id });
let request_id = RequestId::Sync(SyncRequestId::RangeSidecarPair { id });
// Create the blob request based on the blob request.
let blobs_request = Request::BlobsByRange(BlobsByRangeRequest {
@@ -197,8 +196,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: blobs_request,
request_id,
})?;
let block_blob_info = BlocksAndBlobsRequestInfo::default();
self.range_blocks_and_blobs_requests
let block_blob_info = BlockBlobRequestInfo::default();
self.range_sidecar_pair_requests
.insert(id, (chain_id, batch_id, block_blob_info));
Ok(id)
}
@@ -209,12 +208,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn backfill_blocks_by_range_request(
&mut self,
peer_id: PeerId,
batch_type: ByRangeRequestType,
batch_type: ExpectedBatchTy,
request: BlocksByRangeRequest,
batch_id: BatchId,
) -> Result<Id, &'static str> {
match batch_type {
ByRangeRequestType::Blocks => {
ExpectedBatchTy::OnlyBlock => {
trace!(
self.log,
"Sending backfill BlocksByRange request";
@@ -224,7 +223,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
);
let request = Request::BlocksByRange(request);
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::BackFillBlocks { id });
let request_id = RequestId::Sync(SyncRequestId::BackFillSync { id });
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request,
@@ -233,7 +232,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.backfill_requests.insert(id, batch_id);
Ok(id)
}
ByRangeRequestType::BlocksAndBlobs => {
ExpectedBatchTy::OnlyBlockBlobs => {
debug!(
self.log,
"Sending backfill BlocksByRange and BlobsByRange requests";
@@ -244,7 +243,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// create the shared request id. This is fine since the rpc handles substream ids.
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::BackFillBlobs { id });
let request_id = RequestId::Sync(SyncRequestId::BackFillSidecarPair { id });
// Create the blob request based on the blob request.
let blobs_request = Request::BlobsByRange(BlobsByRangeRequest {
@@ -264,8 +263,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: blobs_request,
request_id,
})?;
let block_blob_info = BlocksAndBlobsRequestInfo::default();
self.backfill_blocks_and_blobs_requests
let block_blob_info = BlockBlobRequestInfo::default();
self.backfill_sidecar_pair_requests
.insert(id, (batch_id, block_blob_info));
Ok(id)
}
@@ -289,18 +288,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn range_sync_block_and_blob_response(
&mut self,
request_id: Id,
block_or_blob: BlockOrBlobs<T::EthSpec>,
block_or_blob: BlockOrBlob<T::EthSpec>,
) -> Option<(
ChainId,
BatchId,
Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>,
)> {
match self.range_blocks_and_blobs_requests.entry(request_id) {
match self.range_sidecar_pair_requests.entry(request_id) {
Entry::Occupied(mut entry) => {
let (_, _, info) = entry.get_mut();
match block_or_blob {
BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
}
if info.is_finished() {
// If the request is finished, dequeue everything
@@ -317,28 +316,28 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn range_sync_request_failed(
&mut self,
request_id: Id,
batch_type: ByRangeRequestType,
batch_type: ExpectedBatchTy,
) -> Option<(ChainId, BatchId)> {
match batch_type {
ByRangeRequestType::BlocksAndBlobs => self
.range_blocks_and_blobs_requests
ExpectedBatchTy::OnlyBlockBlobs => self
.range_sidecar_pair_requests
.remove(&request_id)
.map(|(chain_id, batch_id, _info)| (chain_id, batch_id)),
ByRangeRequestType::Blocks => self.range_requests.remove(&request_id),
ExpectedBatchTy::OnlyBlock => self.range_requests.remove(&request_id),
}
}
pub fn backfill_request_failed(
&mut self,
request_id: Id,
batch_type: ByRangeRequestType,
batch_type: ExpectedBatchTy,
) -> Option<BatchId> {
match batch_type {
ByRangeRequestType::BlocksAndBlobs => self
.backfill_blocks_and_blobs_requests
ExpectedBatchTy::OnlyBlockBlobs => self
.backfill_sidecar_pair_requests
.remove(&request_id)
.map(|(batch_id, _info)| batch_id),
ByRangeRequestType::Blocks => self.backfill_requests.remove(&request_id),
ExpectedBatchTy::OnlyBlock => self.backfill_requests.remove(&request_id),
}
}
@@ -361,14 +360,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn backfill_sync_block_and_blob_response(
&mut self,
request_id: Id,
block_or_blob: BlockOrBlobs<T::EthSpec>,
block_or_blob: BlockOrBlob<T::EthSpec>,
) -> Option<(BatchId, Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>)> {
match self.backfill_blocks_and_blobs_requests.entry(request_id) {
match self.backfill_sidecar_pair_requests.entry(request_id) {
Entry::Occupied(mut entry) => {
let (_, info) = entry.get_mut();
match block_or_blob {
BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
}
if info.is_finished() {
// If the request is finished, dequeue everything
@@ -534,7 +533,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
/// Check whether a batch for this epoch (and only this epoch) should request just blocks or
/// blocks and blobs.
pub fn batch_type(&self, epoch: types::Epoch) -> ByRangeRequestType {
pub fn batch_type(&self, epoch: types::Epoch) -> ExpectedBatchTy {
const _: () = assert!(
super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1
&& super::range_sync::EPOCHS_PER_BATCH == 1,
@@ -543,18 +542,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
#[cfg(test)]
{
// Keep tests only for blocks.
return ByRangeRequestType::Blocks;
return ExpectedBatchTy::OnlyBlock;
}
#[cfg(not(test))]
{
if let Some(data_availability_boundary) = self.chain.data_availability_boundary() {
if epoch >= data_availability_boundary {
ByRangeRequestType::BlocksAndBlobs
ExpectedBatchTy::OnlyBlockBlobs
} else {
ByRangeRequestType::Blocks
ExpectedBatchTy::OnlyBlock
}
} else {
ByRangeRequestType::Blocks
ExpectedBatchTy::OnlyBlock
}
}
}

View File

@@ -4,9 +4,10 @@ use lighthouse_network::PeerId;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::ops::Sub;
use std::sync::Arc;
use strum::Display;
use types::signed_block_and_blobs::BlockWrapper;
use types::{Epoch, EthSpec, Slot};
use types::{Epoch, EthSpec, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot};
/// The number of times to retry a batch before it is considered failed.
const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5;
@@ -15,12 +16,36 @@ const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5;
/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty.
const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3;
pub enum BatchTy<T: EthSpec> {
Blocks(Vec<Arc<SignedBeaconBlock<T>>>),
BlocksAndBlobs(Vec<SignedBeaconBlockAndBlobsSidecar<T>>),
}
impl<T: EthSpec> BatchTy<T> {
pub fn into_wrapped_blocks(self) -> Vec<BlockWrapper<T>> {
match self {
BatchTy::Blocks(blocks) => blocks
.into_iter()
.map(|block| BlockWrapper::Block(block))
.collect(),
BatchTy::BlocksAndBlobs(block_sidecar_pair) => block_sidecar_pair
.into_iter()
.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob(block_sidecar_pair))
.collect(),
}
}
}
/// Error representing a batch with mixed block types.
#[derive(Debug)]
pub struct MixedBlockTyErr;
/// Type of expected batch.
#[derive(Debug, Copy, Clone, Display)]
#[strum(serialize_all = "snake_case")]
pub enum ByRangeRequestType {
BlocksAndBlobs,
Blocks,
pub enum ExpectedBatchTy {
OnlyBlockBlobs,
OnlyBlock,
}
/// Allows customisation of the above constants used in other sync methods such as BackFillSync.
@@ -106,7 +131,7 @@ pub struct BatchInfo<T: EthSpec, B: BatchConfig = RangeSyncBatchConfig> {
/// State of the batch.
state: BatchState<T>,
/// Whether this batch contains all blocks or all blocks and blobs.
batch_type: ByRangeRequestType,
batch_type: ExpectedBatchTy,
/// Pin the generic
marker: std::marker::PhantomData<B>,
}
@@ -155,7 +180,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
/// fork boundary will be of mixed type (all blocks and one last blockblob), and I don't want to
/// deal with this for now.
/// This means finalization might be slower in eip4844
pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ByRangeRequestType) -> Self {
pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ExpectedBatchTy) -> Self {
let start_slot = start_epoch.start_slot(T::slots_per_epoch());
let end_slot = start_slot + num_of_epochs * T::slots_per_epoch();
BatchInfo {
@@ -218,7 +243,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
}
/// Returns a BlocksByRange request associated with the batch.
pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) {
pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ExpectedBatchTy) {
(
BlocksByRangeRequest {
start_slot: self.start_slot.into(),
@@ -383,11 +408,30 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
}
}
pub fn start_processing(&mut self) -> Result<Vec<BlockWrapper<T>>, WrongState> {
pub fn start_processing(&mut self) -> Result<BatchTy<T>, WrongState> {
match self.state.poison() {
BatchState::AwaitingProcessing(peer, blocks) => {
self.state = BatchState::Processing(Attempt::new::<B, T>(peer, &blocks));
Ok(blocks)
match self.batch_type {
ExpectedBatchTy::OnlyBlockBlobs => {
let blocks = blocks.into_iter().map(|block| {
let BlockWrapper::BlockAndBlob(block_and_blob) = block else {
panic!("Batches should never have a mixed type. This is a bug. Contact D")
};
block_and_blob
}).collect();
Ok(BatchTy::BlocksAndBlobs(blocks))
}
ExpectedBatchTy::OnlyBlock => {
let blocks = blocks.into_iter().map(|block| {
let BlockWrapper::Block(block) = block else {
panic!("Batches should never have a mixed type. This is a bug. Contact D")
};
block
}).collect();
Ok(BatchTy::Blocks(blocks))
}
}
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {

View File

@@ -332,7 +332,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized);
self.current_processing_batch = Some(batch_id);
let work_event = BeaconWorkEvent::chain_segment(process_id, blocks);
let work_event = BeaconWorkEvent::chain_segment(process_id, blocks.into_wrapped_blocks());
if let Err(e) = beacon_processor_send.try_send(work_event) {
crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch",

View File

@@ -9,8 +9,8 @@ mod range;
mod sync_type;
pub use batch::{
BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
ByRangeRequestType,
BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, BatchTy,
ExpectedBatchTy,
};
pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH};
pub use range::RangeSync;

View File

@@ -373,7 +373,7 @@ where
#[cfg(test)]
mod tests {
use crate::service::RequestId;
use crate::sync::range_sync::ByRangeRequestType;
use crate::sync::range_sync::ExpectedBatchTy;
use crate::NetworkMessage;
use super::*;
@@ -686,7 +686,7 @@ mod tests {
let (peer1, local_info, head_info) = rig.head_peer();
range.add_peer(&mut rig.cx, local_info, peer1, head_info);
let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => {
(rig.cx.range_sync_response(id, true).unwrap(), id)
}
other => panic!("unexpected request {:?}", other),
@@ -705,7 +705,7 @@ mod tests {
let (peer2, local_info, finalized_info) = rig.finalized_peer();
range.add_peer(&mut rig.cx, local_info, peer2, finalized_info);
let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => {
(rig.cx.range_sync_response(id, true).unwrap(), id)
}
other => panic!("unexpected request {:?}", other),