Revert "Revert "renames, remove , wrap BlockWrapper enum to make descontruction private""

This reverts commit 1931a442dc.
This commit is contained in:
realbigsean
2022-12-28 10:31:18 -05:00
parent 1931a442dc
commit 8a70d80a2f
19 changed files with 231 additions and 268 deletions

View File

@@ -1,9 +1,9 @@
//! Provides network functionality for the Syncing thread. This fundamentally wraps a network
//! channel and stores a global RPC ID to perform requests.
use super::block_sidecar_coupling::BlockBlobRequestInfo;
use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo;
use super::manager::{Id, RequestId as SyncRequestId};
use super::range_sync::{BatchId, ChainId, ExpectedBatchTy};
use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
use crate::beacon_processor::WorkEvent;
use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage;
@@ -38,11 +38,12 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
backfill_requests: FnvHashMap<Id, BatchId>,
/// BlocksByRange requests paired with BlobsByRange requests made by the range.
range_sidecar_pair_requests:
FnvHashMap<Id, (ChainId, BatchId, BlockBlobRequestInfo<T::EthSpec>)>,
range_blocks_and_blobs_requests:
FnvHashMap<Id, (ChainId, BatchId, BlocksAndBlobsRequestInfo<T::EthSpec>)>,
/// BlocksByRange requests paired with BlobsByRange requests made by the backfill sync.
backfill_sidecar_pair_requests: FnvHashMap<Id, (BatchId, BlockBlobRequestInfo<T::EthSpec>)>,
backfill_blocks_and_blobs_requests:
FnvHashMap<Id, (BatchId, BlocksAndBlobsRequestInfo<T::EthSpec>)>,
/// Whether the ee is online. If it's not, we don't allow access to the
/// `beacon_processor_send`.
@@ -58,20 +59,20 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
}
/// Small enumeration to make dealing with block and blob requests easier.
pub enum BlockOrBlob<T: EthSpec> {
pub enum BlockOrBlobs<T: EthSpec> {
Block(Option<Arc<SignedBeaconBlock<T>>>),
Blob(Option<Arc<BlobsSidecar<T>>>),
Blobs(Option<Arc<BlobsSidecar<T>>>),
}
impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlob<T> {
impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlobs<T> {
fn from(block: Option<Arc<SignedBeaconBlock<T>>>) -> Self {
BlockOrBlob::Block(block)
BlockOrBlobs::Block(block)
}
}
impl<T: EthSpec> From<Option<Arc<BlobsSidecar<T>>>> for BlockOrBlob<T> {
impl<T: EthSpec> From<Option<Arc<BlobsSidecar<T>>>> for BlockOrBlobs<T> {
fn from(blob: Option<Arc<BlobsSidecar<T>>>) -> Self {
BlockOrBlob::Blob(blob)
BlockOrBlobs::Blobs(blob)
}
}
@@ -89,8 +90,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: 1,
range_requests: Default::default(),
backfill_requests: Default::default(),
range_sidecar_pair_requests: Default::default(),
backfill_sidecar_pair_requests: Default::default(),
range_blocks_and_blobs_requests: Default::default(),
backfill_blocks_and_blobs_requests: Default::default(),
execution_engine_state: EngineState::Online, // always assume `Online` at the start
beacon_processor_send,
chain,
@@ -140,13 +141,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn blocks_by_range_request(
&mut self,
peer_id: PeerId,
batch_type: ExpectedBatchTy,
batch_type: ByRangeRequestType,
request: BlocksByRangeRequest,
chain_id: ChainId,
batch_id: BatchId,
) -> Result<Id, &'static str> {
match batch_type {
ExpectedBatchTy::OnlyBlock => {
ByRangeRequestType::Blocks => {
trace!(
self.log,
"Sending BlocksByRange request";
@@ -156,7 +157,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
);
let request = Request::BlocksByRange(request);
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::RangeSync { id });
let request_id = RequestId::Sync(SyncRequestId::RangeBlocks { id });
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request,
@@ -165,7 +166,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.range_requests.insert(id, (chain_id, batch_id));
Ok(id)
}
ExpectedBatchTy::OnlyBlockBlobs => {
ByRangeRequestType::BlocksAndBlobs => {
debug!(
self.log,
"Sending BlocksByRange and BlobsByRange requests";
@@ -176,7 +177,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// create the shared request id. This is fine since the rpc handles substream ids.
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::RangeSidecarPair { id });
let request_id = RequestId::Sync(SyncRequestId::RangeBlobs { id });
// Create the blob request based on the blob request.
let blobs_request = Request::BlobsByRange(BlobsByRangeRequest {
@@ -196,8 +197,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: blobs_request,
request_id,
})?;
let block_blob_info = BlockBlobRequestInfo::default();
self.range_sidecar_pair_requests
let block_blob_info = BlocksAndBlobsRequestInfo::default();
self.range_blocks_and_blobs_requests
.insert(id, (chain_id, batch_id, block_blob_info));
Ok(id)
}
@@ -208,12 +209,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn backfill_blocks_by_range_request(
&mut self,
peer_id: PeerId,
batch_type: ExpectedBatchTy,
batch_type: ByRangeRequestType,
request: BlocksByRangeRequest,
batch_id: BatchId,
) -> Result<Id, &'static str> {
match batch_type {
ExpectedBatchTy::OnlyBlock => {
ByRangeRequestType::Blocks => {
trace!(
self.log,
"Sending backfill BlocksByRange request";
@@ -223,7 +224,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
);
let request = Request::BlocksByRange(request);
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::BackFillSync { id });
let request_id = RequestId::Sync(SyncRequestId::BackFillBlocks { id });
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request,
@@ -232,7 +233,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.backfill_requests.insert(id, batch_id);
Ok(id)
}
ExpectedBatchTy::OnlyBlockBlobs => {
ByRangeRequestType::BlocksAndBlobs => {
debug!(
self.log,
"Sending backfill BlocksByRange and BlobsByRange requests";
@@ -243,7 +244,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// create the shared request id. This is fine since the rpc handles substream ids.
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::BackFillSidecarPair { id });
let request_id = RequestId::Sync(SyncRequestId::BackFillBlobs { id });
// Create the blob request based on the blob request.
let blobs_request = Request::BlobsByRange(BlobsByRangeRequest {
@@ -263,8 +264,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: blobs_request,
request_id,
})?;
let block_blob_info = BlockBlobRequestInfo::default();
self.backfill_sidecar_pair_requests
let block_blob_info = BlocksAndBlobsRequestInfo::default();
self.backfill_blocks_and_blobs_requests
.insert(id, (batch_id, block_blob_info));
Ok(id)
}
@@ -288,18 +289,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn range_sync_block_and_blob_response(
&mut self,
request_id: Id,
block_or_blob: BlockOrBlob<T::EthSpec>,
block_or_blob: BlockOrBlobs<T::EthSpec>,
) -> Option<(
ChainId,
BatchId,
Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>,
)> {
match self.range_sidecar_pair_requests.entry(request_id) {
match self.range_blocks_and_blobs_requests.entry(request_id) {
Entry::Occupied(mut entry) => {
let (_, _, info) = entry.get_mut();
match block_or_blob {
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
}
if info.is_finished() {
// If the request is finished, dequeue everything
@@ -316,28 +317,28 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn range_sync_request_failed(
&mut self,
request_id: Id,
batch_type: ExpectedBatchTy,
batch_type: ByRangeRequestType,
) -> Option<(ChainId, BatchId)> {
match batch_type {
ExpectedBatchTy::OnlyBlockBlobs => self
.range_sidecar_pair_requests
ByRangeRequestType::BlocksAndBlobs => self
.range_blocks_and_blobs_requests
.remove(&request_id)
.map(|(chain_id, batch_id, _info)| (chain_id, batch_id)),
ExpectedBatchTy::OnlyBlock => self.range_requests.remove(&request_id),
ByRangeRequestType::Blocks => self.range_requests.remove(&request_id),
}
}
pub fn backfill_request_failed(
&mut self,
request_id: Id,
batch_type: ExpectedBatchTy,
batch_type: ByRangeRequestType,
) -> Option<BatchId> {
match batch_type {
ExpectedBatchTy::OnlyBlockBlobs => self
.backfill_sidecar_pair_requests
ByRangeRequestType::BlocksAndBlobs => self
.backfill_blocks_and_blobs_requests
.remove(&request_id)
.map(|(batch_id, _info)| batch_id),
ExpectedBatchTy::OnlyBlock => self.backfill_requests.remove(&request_id),
ByRangeRequestType::Blocks => self.backfill_requests.remove(&request_id),
}
}
@@ -360,14 +361,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn backfill_sync_block_and_blob_response(
&mut self,
request_id: Id,
block_or_blob: BlockOrBlob<T::EthSpec>,
block_or_blob: BlockOrBlobs<T::EthSpec>,
) -> Option<(BatchId, Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>)> {
match self.backfill_sidecar_pair_requests.entry(request_id) {
match self.backfill_blocks_and_blobs_requests.entry(request_id) {
Entry::Occupied(mut entry) => {
let (_, info) = entry.get_mut();
match block_or_blob {
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
}
if info.is_finished() {
// If the request is finished, dequeue everything
@@ -533,7 +534,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
/// Check whether a batch for this epoch (and only this epoch) should request just blocks or
/// blocks and blobs.
pub fn batch_type(&self, epoch: types::Epoch) -> ExpectedBatchTy {
pub fn batch_type(&self, epoch: types::Epoch) -> ByRangeRequestType {
const _: () = assert!(
super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1
&& super::range_sync::EPOCHS_PER_BATCH == 1,
@@ -542,18 +543,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
#[cfg(test)]
{
// Keep tests only for blocks.
return ExpectedBatchTy::OnlyBlock;
return ByRangeRequestType::Blocks;
}
#[cfg(not(test))]
{
if let Some(data_availability_boundary) = self.chain.data_availability_boundary() {
if epoch >= data_availability_boundary {
ExpectedBatchTy::OnlyBlockBlobs
ByRangeRequestType::BlocksAndBlobs
} else {
ExpectedBatchTy::OnlyBlock
ByRangeRequestType::Blocks
}
} else {
ExpectedBatchTy::OnlyBlock
ByRangeRequestType::Blocks
}
}
}