Sync lookup dedup range and blobs (#5561)

* Handle sync range blocks as blocks and blobs

* Merge range sync and backfill sync handling

* Update tests

* Add no_blobs_into_responses test

* Address @realbigsean comments

* Merge remote-tracking branch 'origin/unstable' into sync-lookup-dedup-range-and-blobs
This commit is contained in:
Lion - dapplion
2024-04-13 00:39:11 +09:00
committed by GitHub
parent 5fdd3b39bb
commit 6fb0b2ed78
7 changed files with 254 additions and 529 deletions

View File

@@ -497,10 +497,7 @@ impl<T: BeaconChainTypes> Router<T> {
crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id);
return;
}
id @ (SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::BackFillBlockAndBlobs { .. }
| SyncId::RangeBlockAndBlobs { .. }) => id,
id @ SyncId::RangeBlockAndBlobs { .. } => id,
},
RequestId::Router => {
crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id);
@@ -559,10 +556,7 @@ impl<T: BeaconChainTypes> Router<T> {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ SyncId::SingleBlock { .. } => id,
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlockAndBlobs { .. }
| SyncId::BackFillBlockAndBlobs { .. } => {
SyncId::RangeBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id);
return;
}
@@ -604,10 +598,7 @@ impl<T: BeaconChainTypes> Router<T> {
crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id);
return;
}
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlockAndBlobs { .. }
| SyncId::BackFillBlockAndBlobs { .. } => {
SyncId::RangeBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id);
return;
}

View File

@@ -10,6 +10,7 @@
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::manager::{BatchProcessResult, Id};
use crate::sync::network_context::RangeRequestId;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::range_sync::{
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
@@ -961,7 +962,12 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, is_blob_batch) = batch.to_blocks_by_range_request();
match network.backfill_blocks_by_range_request(peer, is_blob_batch, request, batch_id) {
match network.blocks_and_blobs_by_range_request(
peer,
is_blob_batch,
request,
RangeRequestId::BackfillSync { batch_id },
) {
Ok(request_id) => {
// inform the batch about the new request
if let Err(e) = batch.start_downloading_from_peer(peer, request_id) {

View File

@@ -3,7 +3,9 @@ use ssz_types::VariableList;
use std::{collections::VecDeque, sync::Arc};
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
#[derive(Debug, Default)]
use super::range_sync::ByRangeRequestType;
#[derive(Debug)]
pub struct BlocksAndBlobsRequestInfo<E: EthSpec> {
/// Blocks we have received awaiting for their corresponding sidecar.
accumulated_blocks: VecDeque<Arc<SignedBeaconBlock<E>>>,
@@ -13,9 +15,25 @@ pub struct BlocksAndBlobsRequestInfo<E: EthSpec> {
is_blocks_stream_terminated: bool,
/// Whether the individual RPC request for sidecars is finished or not.
is_sidecars_stream_terminated: bool,
/// Used to determine if this accumulator should wait for a sidecars stream termination
request_type: ByRangeRequestType,
}
impl<E: EthSpec> BlocksAndBlobsRequestInfo<E> {
pub fn new(request_type: ByRangeRequestType) -> Self {
Self {
accumulated_blocks: <_>::default(),
accumulated_sidecars: <_>::default(),
is_blocks_stream_terminated: <_>::default(),
is_sidecars_stream_terminated: <_>::default(),
request_type,
}
}
pub fn get_request_type(&self) -> ByRangeRequestType {
self.request_type
}
pub fn add_block_response(&mut self, block_opt: Option<Arc<SignedBeaconBlock<E>>>) {
match block_opt {
Some(block) => self.accumulated_blocks.push_back(block),
@@ -78,6 +96,38 @@ impl<E: EthSpec> BlocksAndBlobsRequestInfo<E> {
}
pub fn is_finished(&self) -> bool {
self.is_blocks_stream_terminated && self.is_sidecars_stream_terminated
let blobs_requested = match self.request_type {
ByRangeRequestType::Blocks => false,
ByRangeRequestType::BlocksAndBlobs => true,
};
self.is_blocks_stream_terminated && (!blobs_requested || self.is_sidecars_stream_terminated)
}
}
#[cfg(test)]
mod tests {
use super::BlocksAndBlobsRequestInfo;
use crate::sync::range_sync::ByRangeRequestType;
use beacon_chain::test_utils::{generate_rand_block_and_blobs, NumBlobs};
use rand::SeedableRng;
use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E};
#[test]
fn no_blobs_into_responses() {
let mut info = BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::Blocks);
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..4)
.map(|_| generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng).0)
.collect::<Vec<_>>();
// Send blocks and complete terminate response
for block in blocks {
info.add_block_response(Some(block.into()));
}
info.add_block_response(None);
// Assert response is finished and RpcBlocks can be constructed
assert!(info.is_finished());
info.into_responses().unwrap();
}
}

View File

@@ -36,7 +36,7 @@
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::block_lookups::common::LookupType;
use super::block_lookups::BlockLookups;
use super::network_context::{BlockOrBlob, SyncNetworkContext};
use super::network_context::{BlockOrBlob, RangeRequestId, SyncNetworkContext};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor};
@@ -44,8 +44,7 @@ use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::common::{Current, Parent};
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState};
use crate::sync::network_context::BlocksAndBlobsByRangeRequest;
use crate::sync::range_sync::ByRangeRequestType;
use crate::sync::block_sidecar_coupling::BlocksAndBlobsRequestInfo;
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_availability_checker::ChildComponents;
@@ -91,12 +90,6 @@ pub enum RequestId {
SingleBlock { id: SingleLookupReqId },
/// Request searching for a set of blobs given a hash.
SingleBlob { id: SingleLookupReqId },
/// Request was from the backfill sync algorithm.
BackFillBlocks { id: Id },
/// Backfill request that is composed by both a block range request and a blob range request.
BackFillBlockAndBlobs { id: Id },
/// The request was from a chain in the range sync algorithm.
RangeBlocks { id: Id },
/// Range request that is composed by both a block range request and a blob range request.
RangeBlockAndBlobs { id: Id },
}
@@ -363,63 +356,27 @@ impl<T: BeaconChainTypes> SyncManager<T> {
error,
),
},
RequestId::BackFillBlocks { id } => {
if let Some(batch_id) = self
.network
.backfill_request_failed(id, ByRangeRequestType::Blocks)
{
match self
.backfill_sync
.inject_error(&mut self.network, batch_id, &peer_id, id)
{
Ok(_) => {}
Err(_) => self.update_sync_state(),
}
}
}
RequestId::BackFillBlockAndBlobs { id } => {
if let Some(batch_id) = self
.network
.backfill_request_failed(id, ByRangeRequestType::BlocksAndBlobs)
{
match self
.backfill_sync
.inject_error(&mut self.network, batch_id, &peer_id, id)
{
Ok(_) => {}
Err(_) => self.update_sync_state(),
}
}
}
RequestId::RangeBlocks { id } => {
if let Some((chain_id, batch_id)) = self
.network
.range_sync_request_failed(id, ByRangeRequestType::Blocks)
{
self.range_sync.inject_error(
&mut self.network,
peer_id,
batch_id,
chain_id,
id,
);
self.update_sync_state()
}
}
RequestId::RangeBlockAndBlobs { id } => {
if let Some((chain_id, batch_id)) = self
.network
.range_sync_request_failed(id, ByRangeRequestType::BlocksAndBlobs)
{
self.range_sync.inject_error(
&mut self.network,
peer_id,
batch_id,
chain_id,
id,
);
self.update_sync_state()
if let Some(sender_id) = self.network.range_request_failed(id) {
match sender_id {
RangeRequestId::RangeSync { chain_id, batch_id } => {
self.range_sync.inject_error(
&mut self.network,
peer_id,
batch_id,
chain_id,
id,
);
self.update_sync_state();
}
RangeRequestId::BackfillSync { batch_id } => match self
.backfill_sync
.inject_error(&mut self.network, batch_id, &peer_id, id)
{
Ok(_) => {}
Err(_) => self.update_sync_state(),
},
}
}
}
}
@@ -901,49 +858,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
RequestId::SingleBlob { .. } => {
crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id );
}
RequestId::BackFillBlocks { id } => {
let is_stream_terminator = block.is_none();
if let Some(batch_id) = self
.network
.backfill_sync_only_blocks_response(id, is_stream_terminator)
{
match self.backfill_sync.on_block_response(
&mut self.network,
batch_id,
&peer_id,
id,
block.map(|b| RpcBlock::new_without_blobs(None, b)),
) {
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
Ok(ProcessResult::Successful) => {}
Err(_error) => {
// The backfill sync has failed, errors are reported
// within.
self.update_sync_state();
}
}
}
}
RequestId::RangeBlocks { id } => {
let is_stream_terminator = block.is_none();
if let Some((chain_id, batch_id)) = self
.network
.range_sync_block_only_response(id, is_stream_terminator)
{
self.range_sync.blocks_by_range_response(
&mut self.network,
peer_id,
chain_id,
batch_id,
id,
block.map(|b| RpcBlock::new_without_blobs(None, b)),
);
self.update_sync_state();
}
}
RequestId::BackFillBlockAndBlobs { id } => {
self.backfill_block_and_blobs_response(id, peer_id, block.into())
}
RequestId::RangeBlockAndBlobs { id } => {
self.range_block_and_blobs_response(id, peer_id, block.into())
}
@@ -981,15 +895,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
&self.network,
),
},
RequestId::BackFillBlocks { id: _ } => {
crit!(self.log, "Blob received during backfill block request"; "peer_id" => %peer_id );
}
RequestId::RangeBlocks { id: _ } => {
crit!(self.log, "Blob received during range block request"; "peer_id" => %peer_id );
}
RequestId::BackFillBlockAndBlobs { id } => {
self.backfill_block_and_blobs_response(id, peer_id, blob.into())
}
RequestId::RangeBlockAndBlobs { id } => {
self.range_block_and_blobs_response(id, peer_id, blob.into())
}
@@ -1004,9 +909,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id: PeerId,
block_or_blob: BlockOrBlob<T::EthSpec>,
) {
if let Some((chain_id, resp)) = self
if let Some(resp) = self
.network
.range_sync_block_and_blob_response(id, block_or_blob)
.range_block_and_blob_response(id, block_or_blob)
{
match resp.responses {
Ok(blocks) => {
@@ -1016,33 +921,52 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// chain the stream terminator
.chain(vec![None])
{
self.range_sync.blocks_by_range_response(
&mut self.network,
peer_id,
chain_id,
resp.batch_id,
id,
block,
);
self.update_sync_state();
match resp.sender_id {
RangeRequestId::RangeSync { chain_id, batch_id } => {
self.range_sync.blocks_by_range_response(
&mut self.network,
peer_id,
chain_id,
batch_id,
id,
block,
);
self.update_sync_state();
}
RangeRequestId::BackfillSync { batch_id } => {
match self.backfill_sync.on_block_response(
&mut self.network,
batch_id,
&peer_id,
id,
block,
) {
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
Ok(ProcessResult::Successful) => {}
Err(_error) => {
// The backfill sync has failed, errors are reported
// within.
self.update_sync_state();
}
}
}
}
}
}
Err(e) => {
// Re-insert the request so we can retry
let new_req = BlocksAndBlobsByRangeRequest {
chain_id,
batch_id: resp.batch_id,
block_blob_info: <_>::default(),
};
self.network
.insert_range_blocks_and_blobs_request(id, new_req);
self.network.insert_range_blocks_and_blobs_request(
id,
resp.sender_id,
BlocksAndBlobsRequestInfo::new(resp.request_type),
);
// inform range that the request needs to be treated as failed
// With time we will want to downgrade this log
warn!(
self.log,
"Blocks and blobs request for range received invalid data";
"peer_id" => %peer_id,
"batch_id" => resp.batch_id,
"sender_id" => ?resp.sender_id,
"error" => e.clone()
);
let id = RequestId::RangeBlockAndBlobs { id };
@@ -1056,69 +980,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
}
/// Handles receiving a response for a Backfill sync request that should have both blocks and
/// blobs.
fn backfill_block_and_blobs_response(
&mut self,
id: Id,
peer_id: PeerId,
block_or_blob: BlockOrBlob<T::EthSpec>,
) {
if let Some(resp) = self
.network
.backfill_sync_block_and_blob_response(id, block_or_blob)
{
match resp.responses {
Ok(blocks) => {
for block in blocks
.into_iter()
.map(Some)
// chain the stream terminator
.chain(vec![None])
{
match self.backfill_sync.on_block_response(
&mut self.network,
resp.batch_id,
&peer_id,
id,
block,
) {
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
Ok(ProcessResult::Successful) => {}
Err(_error) => {
// The backfill sync has failed, errors are reported
// within.
self.update_sync_state();
}
}
}
}
Err(e) => {
// Re-insert the request so we can retry
self.network.insert_backfill_blocks_and_blobs_requests(
id,
resp.batch_id,
<_>::default(),
);
// inform backfill that the request needs to be treated as failed
// With time we will want to downgrade this log
warn!(
self.log, "Blocks and blobs request for backfill received invalid data";
"peer_id" => %peer_id, "batch_id" => resp.batch_id, "error" => e.clone()
);
let id = RequestId::BackFillBlockAndBlobs { id };
self.network.report_peer(
peer_id,
PeerAction::MidToleranceError,
"block_blob_faulty_backfill_batch",
);
self.inject_error(peer_id, id, RPCError::InvalidData(e))
}
}
}
}
}
impl<E: EthSpec> From<Result<AvailabilityProcessingStatus, BlockError<E>>>

View File

@@ -21,14 +21,20 @@ use tokio::sync::mpsc;
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
pub struct BlocksAndBlobsByRangeResponse<E: EthSpec> {
pub batch_id: BatchId,
pub sender_id: RangeRequestId,
pub responses: Result<Vec<RpcBlock<E>>, String>,
pub request_type: ByRangeRequestType,
}
pub struct BlocksAndBlobsByRangeRequest<E: EthSpec> {
pub chain_id: ChainId,
pub batch_id: BatchId,
pub block_blob_info: BlocksAndBlobsRequestInfo<E>,
#[derive(Debug, Clone, Copy)]
pub enum RangeRequestId {
RangeSync {
chain_id: ChainId,
batch_id: BatchId,
},
BackfillSync {
batch_id: BatchId,
},
}
/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id.
@@ -39,18 +45,9 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// A sequential ID for all RPC requests.
request_id: Id,
/// BlocksByRange requests made by the range syncing algorithm.
range_requests: FnvHashMap<Id, (ChainId, BatchId)>,
/// BlocksByRange requests made by backfill syncing.
backfill_requests: FnvHashMap<Id, BatchId>,
/// BlocksByRange requests paired with BlobsByRange requests made by the range.
range_blocks_and_blobs_requests: FnvHashMap<Id, BlocksAndBlobsByRangeRequest<T::EthSpec>>,
/// BlocksByRange requests paired with BlobsByRange requests made by the backfill sync.
backfill_blocks_and_blobs_requests:
FnvHashMap<Id, (BatchId, BlocksAndBlobsRequestInfo<T::EthSpec>)>,
/// BlocksByRange requests paired with BlobsByRange
range_blocks_and_blobs_requests:
FnvHashMap<Id, (RangeRequestId, BlocksAndBlobsRequestInfo<T::EthSpec>)>,
/// Whether the ee is online. If it's not, we don't allow access to the
/// `beacon_processor_send`.
@@ -94,10 +91,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
network_send,
execution_engine_state: EngineState::Online, // always assume `Online` at the start
request_id: 1,
range_requests: FnvHashMap::default(),
backfill_requests: FnvHashMap::default(),
range_blocks_and_blobs_requests: FnvHashMap::default(),
backfill_blocks_and_blobs_requests: FnvHashMap::default(),
network_beacon_processor,
chain,
log,
@@ -148,266 +142,85 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peer_id: PeerId,
batch_type: ByRangeRequestType,
request: BlocksByRangeRequest,
chain_id: ChainId,
batch_id: BatchId,
) -> Result<Id, &'static str> {
match batch_type {
ByRangeRequestType::Blocks => {
trace!(
self.log,
"Sending BlocksByRange request";
"method" => "BlocksByRange",
"count" => request.count(),
"peer" => %peer_id,
);
let request = Request::BlocksByRange(request);
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::RangeBlocks { id });
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request,
request_id,
})?;
self.range_requests.insert(id, (chain_id, batch_id));
Ok(id)
}
ByRangeRequestType::BlocksAndBlobs => {
debug!(
self.log,
"Sending BlocksByRange and BlobsByRange requests";
"method" => "Mixed by range request",
"count" => request.count(),
"peer" => %peer_id,
);
let id = self.next_id();
trace!(
self.log,
"Sending BlocksByRange request";
"method" => "BlocksByRange",
"count" => request.count(),
"peer" => %peer_id,
);
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: Request::BlocksByRange(request.clone()),
request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }),
})?;
// create the shared request id. This is fine since the rpc handles substream ids.
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id });
if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) {
debug!(
self.log,
"Sending BlobsByRange requests";
"method" => "BlobsByRange",
"count" => request.count(),
"peer" => %peer_id,
);
// Create the blob request based on the blob request.
let blobs_request = Request::BlobsByRange(BlobsByRangeRequest {
// Create the blob request based on the blocks request.
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: Request::BlobsByRange(BlobsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
});
let blocks_request = Request::BlocksByRange(request);
// Send both requests. Make sure both can be sent.
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: blocks_request,
request_id,
})?;
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: blobs_request,
request_id,
})?;
let block_blob_info = BlocksAndBlobsRequestInfo::default();
self.range_blocks_and_blobs_requests.insert(
id,
BlocksAndBlobsByRangeRequest {
chain_id,
batch_id,
block_blob_info,
},
);
Ok(id)
}
}),
request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }),
})?;
}
Ok(id)
}
/// A blocks by range request sent by the backfill sync algorithm
pub fn backfill_blocks_by_range_request(
/// A blocks by range request sent by the range sync algorithm
pub fn blocks_and_blobs_by_range_request(
&mut self,
peer_id: PeerId,
batch_type: ByRangeRequestType,
request: BlocksByRangeRequest,
batch_id: BatchId,
sender_id: RangeRequestId,
) -> Result<Id, &'static str> {
match batch_type {
ByRangeRequestType::Blocks => {
trace!(
self.log,
"Sending backfill BlocksByRange request";
"method" => "BlocksByRange",
"count" => request.count(),
"peer" => %peer_id,
);
let request = Request::BlocksByRange(request);
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::BackFillBlocks { id });
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request,
request_id,
})?;
self.backfill_requests.insert(id, batch_id);
Ok(id)
}
ByRangeRequestType::BlocksAndBlobs => {
debug!(
self.log,
"Sending backfill BlocksByRange and BlobsByRange requests";
"method" => "Mixed by range request",
"count" => request.count(),
"peer" => %peer_id,
);
// create the shared request id. This is fine since the rpc handles substream ids.
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::BackFillBlockAndBlobs { id });
// Create the blob request based on the blob request.
let blobs_request = Request::BlobsByRange(BlobsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
});
let blocks_request = Request::BlocksByRange(request);
// Send both requests. Make sure both can be sent.
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: blocks_request,
request_id,
})?;
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: blobs_request,
request_id,
})?;
let block_blob_info = BlocksAndBlobsRequestInfo::default();
self.backfill_blocks_and_blobs_requests
.insert(id, (batch_id, block_blob_info));
Ok(id)
}
}
let id = self.blocks_by_range_request(peer_id, batch_type, request)?;
self.range_blocks_and_blobs_requests
.insert(id, (sender_id, BlocksAndBlobsRequestInfo::new(batch_type)));
Ok(id)
}
/// Response for a request that is only for blocks.
pub fn range_sync_block_only_response(
&mut self,
request_id: Id,
is_stream_terminator: bool,
) -> Option<(ChainId, BatchId)> {
if is_stream_terminator {
self.range_requests.remove(&request_id)
} else {
self.range_requests.get(&request_id).copied()
}
}
/// Received a blocks by range response for a request that couples blocks and blobs.
pub fn range_sync_block_and_blob_response(
&mut self,
request_id: Id,
block_or_blob: BlockOrBlob<T::EthSpec>,
) -> Option<(ChainId, BlocksAndBlobsByRangeResponse<T::EthSpec>)> {
match self.range_blocks_and_blobs_requests.entry(request_id) {
Entry::Occupied(mut entry) => {
let req = entry.get_mut();
let info = &mut req.block_blob_info;
match block_or_blob {
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
}
if info.is_finished() {
// If the request is finished, dequeue everything
let BlocksAndBlobsByRangeRequest {
chain_id,
batch_id,
block_blob_info,
} = entry.remove();
Some((
chain_id,
BlocksAndBlobsByRangeResponse {
batch_id,
responses: block_blob_info.into_responses(),
},
))
} else {
None
}
}
Entry::Vacant(_) => None,
}
}
pub fn range_sync_request_failed(
&mut self,
request_id: Id,
batch_type: ByRangeRequestType,
) -> Option<(ChainId, BatchId)> {
let req = match batch_type {
ByRangeRequestType::BlocksAndBlobs => self
.range_blocks_and_blobs_requests
.remove(&request_id)
.map(|req| (req.chain_id, req.batch_id)),
ByRangeRequestType::Blocks => self.range_requests.remove(&request_id),
};
if let Some(req) = req {
pub fn range_request_failed(&mut self, request_id: Id) -> Option<RangeRequestId> {
let sender_id = self
.range_blocks_and_blobs_requests
.remove(&request_id)
.map(|(sender_id, _info)| sender_id);
if let Some(sender_id) = sender_id {
debug!(
self.log,
"Range sync request failed";
"Sync range request failed";
"request_id" => request_id,
"batch_type" => ?batch_type,
"chain_id" => ?req.0,
"batch_id" => ?req.1
"sender_id" => ?sender_id
);
Some(req)
Some(sender_id)
} else {
debug!(self.log, "Range sync request failed"; "request_id" => request_id, "batch_type" => ?batch_type);
debug!(self.log, "Sync range request failed"; "request_id" => request_id);
None
}
}
pub fn backfill_request_failed(
&mut self,
request_id: Id,
batch_type: ByRangeRequestType,
) -> Option<BatchId> {
let batch_id = match batch_type {
ByRangeRequestType::BlocksAndBlobs => self
.backfill_blocks_and_blobs_requests
.remove(&request_id)
.map(|(batch_id, _info)| batch_id),
ByRangeRequestType::Blocks => self.backfill_requests.remove(&request_id),
};
if let Some(batch_id) = batch_id {
debug!(
self.log,
"Backfill sync request failed";
"request_id" => request_id,
"batch_type" => ?batch_type,
"batch_id" => ?batch_id
);
Some(batch_id)
} else {
debug!(self.log, "Backfill sync request failed"; "request_id" => request_id, "batch_type" => ?batch_type);
None
}
}
/// Response for a request that is only for blocks.
pub fn backfill_sync_only_blocks_response(
&mut self,
request_id: Id,
is_stream_terminator: bool,
) -> Option<BatchId> {
if is_stream_terminator {
self.backfill_requests.remove(&request_id)
} else {
self.backfill_requests.get(&request_id).copied()
}
}
/// Received a blocks by range or blobs by range response for a request that couples blocks '
/// and blobs.
pub fn backfill_sync_block_and_blob_response(
pub fn range_block_and_blob_response(
&mut self,
request_id: Id,
block_or_blob: BlockOrBlob<T::EthSpec>,
) -> Option<BlocksAndBlobsByRangeResponse<T::EthSpec>> {
match self.backfill_blocks_and_blobs_requests.entry(request_id) {
match self.range_blocks_and_blobs_requests.entry(request_id) {
Entry::Occupied(mut entry) => {
let (_, info) = entry.get_mut();
match block_or_blob {
@@ -416,12 +229,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
if info.is_finished() {
// If the request is finished, dequeue everything
let (batch_id, info) = entry.remove();
let responses = info.into_responses();
let (sender_id, info) = entry.remove();
let request_type = info.get_request_type();
Some(BlocksAndBlobsByRangeResponse {
batch_id,
responses,
sender_id,
request_type,
responses: info.into_responses(),
})
} else {
None
@@ -586,18 +399,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn insert_range_blocks_and_blobs_request(
&mut self,
id: Id,
request: BlocksAndBlobsByRangeRequest<T::EthSpec>,
sender_id: RangeRequestId,
info: BlocksAndBlobsRequestInfo<T::EthSpec>,
) {
self.range_blocks_and_blobs_requests.insert(id, request);
}
pub fn insert_backfill_blocks_and_blobs_requests(
&mut self,
id: Id,
batch_id: BatchId,
request: BlocksAndBlobsRequestInfo<T::EthSpec>,
) {
self.backfill_blocks_and_blobs_requests
.insert(id, (batch_id, request));
self.range_blocks_and_blobs_requests
.insert(id, (sender_id, info));
}
}

View File

@@ -1,5 +1,6 @@
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::network_context::RangeRequestId;
use crate::sync::{
manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult,
};
@@ -905,7 +906,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> ProcessingResult {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, batch_type) = batch.to_blocks_by_range_request();
match network.blocks_by_range_request(peer, batch_type, request, self.id, batch_id) {
match network.blocks_and_blobs_by_range_request(
peer,
batch_type,
request,
RangeRequestId::RangeSync {
chain_id: self.id,
batch_id,
},
) {
Ok(request_id) => {
// inform the batch about the new request
batch.start_downloading_from_peer(peer, request_id)?;

View File

@@ -384,7 +384,7 @@ mod tests {
use crate::NetworkMessage;
use super::*;
use crate::sync::network_context::BlockOrBlob;
use crate::sync::network_context::{BlockOrBlob, RangeRequestId};
use beacon_chain::builder::Witness;
use beacon_chain::eth1_chain::CachingEth1Backend;
use beacon_chain::parking_lot::RwLock;
@@ -548,6 +548,51 @@ mod tests {
(block_req_id, blob_req_id)
}
fn complete_range_block_and_blobs_response(
&mut self,
block_req: RequestId,
blob_req_opt: Option<RequestId>,
) -> (ChainId, BatchId, Id) {
if blob_req_opt.is_some() {
match block_req {
RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => {
let _ = self
.cx
.range_block_and_blob_response(id, BlockOrBlob::Block(None));
let response = self
.cx
.range_block_and_blob_response(id, BlockOrBlob::Blob(None))
.unwrap();
let (chain_id, batch_id) =
TestRig::unwrap_range_request_id(response.sender_id);
(chain_id, batch_id, id)
}
other => panic!("unexpected request {:?}", other),
}
} else {
match block_req {
RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => {
let response = self
.cx
.range_block_and_blob_response(id, BlockOrBlob::Block(None))
.unwrap();
let (chain_id, batch_id) =
TestRig::unwrap_range_request_id(response.sender_id);
(chain_id, batch_id, id)
}
other => panic!("unexpected request {:?}", other),
}
}
}
fn unwrap_range_request_id(sender_id: RangeRequestId) -> (ChainId, BatchId) {
if let RangeRequestId::RangeSync { chain_id, batch_id } = sender_id {
(chain_id, batch_id)
} else {
panic!("expected RangeSync request: {:?}", sender_id)
}
}
/// Produce a head peer
fn head_peer(
&self,
@@ -744,29 +789,8 @@ mod tests {
range.add_peer(&mut rig.cx, local_info, peer1, head_info);
let (block_req, blob_req_opt) = rig.grab_request(&peer1, fork);
let (chain1, batch1, id1) = if blob_req_opt.is_some() {
match block_req {
RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => {
let _ = rig
.cx
.range_sync_block_and_blob_response(id, BlockOrBlob::Block(None));
let (chain1, response) = rig
.cx
.range_sync_block_and_blob_response(id, BlockOrBlob::Blob(None))
.unwrap();
(chain1, response.batch_id, id)
}
other => panic!("unexpected request {:?}", other),
}
} else {
match block_req {
RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => {
let (chain, batch) = rig.cx.range_sync_block_only_response(id, true).unwrap();
(chain, batch, id)
}
other => panic!("unexpected request {:?}", other),
}
};
let (chain1, batch1, id1) =
rig.complete_range_block_and_blobs_response(block_req, blob_req_opt);
// make the ee offline
rig.cx.update_execution_engine_state(EngineState::Offline);
@@ -782,29 +806,8 @@ mod tests {
range.add_peer(&mut rig.cx, local_info, peer2, finalized_info);
let (block_req, blob_req_opt) = rig.grab_request(&peer2, fork);
let (chain2, batch2, id2) = if blob_req_opt.is_some() {
match block_req {
RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => {
let _ = rig
.cx
.range_sync_block_and_blob_response(id, BlockOrBlob::Block(None));
let (chain2, response) = rig
.cx
.range_sync_block_and_blob_response(id, BlockOrBlob::Blob(None))
.unwrap();
(chain2, response.batch_id, id)
}
other => panic!("unexpected request {:?}", other),
}
} else {
match block_req {
RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => {
let (chain, batch) = rig.cx.range_sync_block_only_response(id, true).unwrap();
(chain, batch, id)
}
other => panic!("unexpected request {:?}", other),
}
};
let (chain2, batch2, id2) =
rig.complete_range_block_and_blobs_response(block_req, blob_req_opt);
// send the response to the request
range.blocks_by_range_response(&mut rig.cx, peer2, chain2, batch2, id2, None);