diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 56a5f24586..76962b373f 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -258,7 +258,7 @@ impl Processor { ); if let RequestId::Sync(id) = request_id { - self.send_to_sync(SyncMessage::RpcBlobs { + self.send_to_sync(SyncMessage::RpcBlob { peer_id, request_id: id, blob_sidecar, @@ -330,7 +330,7 @@ impl Processor { "Received BlobsByRoot Response"; "peer" => %peer_id, ); - self.send_to_sync(SyncMessage::RpcBlobs { + self.send_to_sync(SyncMessage::RpcBlob { request_id, peer_id, blob_sidecar, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 768b95273e..43921b585a 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,7 +35,7 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; -use super::network_context::{BlockOrBlobs, SyncNetworkContext}; +use super::network_context::{BlockOrBlob, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; @@ -86,6 +86,10 @@ pub enum RequestId { RangeBlobs { id: Id }, } +// TODO(diva) I'm updating functions what at a time, but this should be revisited because I think +// some code paths that are split for blobs and blocks can be made just one after sync as a whole +// is updated. + #[derive(Debug)] /// A message that can be sent to the sync manager thread. pub enum SyncMessage { @@ -101,7 +105,7 @@ pub enum SyncMessage { }, /// A blob has been received from the RPC. - RpcBlobs { + RpcBlob { request_id: RequestId, peer_id: PeerId, blob_sidecar: Option>>, @@ -554,7 +558,12 @@ impl SyncManager { beacon_block, seen_timestamp, } => { - self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp); + self.rpc_block_or_blob_received( + request_id, + peer_id, + beacon_block.into(), + seen_timestamp, + ); } SyncMessage::UnknownBlock(peer_id, block, block_root) => { // If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore @@ -638,12 +647,17 @@ impl SyncManager { .block_lookups .parent_chain_processed(chain_hash, result, &mut self.network), }, - SyncMessage::RpcBlobs { + SyncMessage::RpcBlob { request_id, peer_id, blob_sidecar, seen_timestamp, - } => self.rpc_blobs_received(request_id, peer_id, blob_sidecar, seen_timestamp), + } => self.rpc_block_or_blob_received( + request_id, + peer_id, + blob_sidecar.into(), + seen_timestamp, + ), } } @@ -702,30 +716,50 @@ impl SyncManager { } } - fn rpc_block_received( + fn rpc_block_or_blob_received( &mut self, request_id: RequestId, peer_id: PeerId, - beacon_block: Option>>, + block_or_blob: BlockOrBlob, seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( - id, - peer_id, - beacon_block.map(|block| block.into()), - seen_timestamp, - &mut self.network, - ), - RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( - id, - peer_id, - beacon_block.map(|block| block.into()), - seen_timestamp, - &mut self.network, - ), + RequestId::SingleBlock { id } => { + // TODO(diva) adjust when dealing with by root requests. This code is here to + // satisfy dead code analysis + match block_or_blob { + BlockOrBlob::Block(maybe_block) => { + self.block_lookups.single_block_lookup_response( + id, + peer_id, + maybe_block.map(BlockWrapper::Block), + seen_timestamp, + &mut self.network, + ) + } + BlockOrBlob::Sidecar(_) => unimplemented!("Mimatch between BlockWrapper and what the network receives needs to be handled first."), + } + } + RequestId::ParentLookup { id } => { + // TODO(diva) adjust when dealing with by root requests. This code is here to + // satisfy dead code analysis + match block_or_blob { + BlockOrBlob::Block(maybe_block) => self.block_lookups.parent_lookup_response( + id, + peer_id, + maybe_block.map(BlockWrapper::Block), + seen_timestamp, + &mut self.network, + ), + BlockOrBlob::Sidecar(_) => unimplemented!("Mimatch between BlockWrapper and what the network receives needs to be handled first."), + } + } RequestId::BackFillBlocks { id } => { - let is_stream_terminator = beacon_block.is_none(); + let maybe_block = match block_or_blob { + BlockOrBlob::Block(maybe_block) => maybe_block, + BlockOrBlob::Sidecar(_) => todo!("I think this is unreachable"), + }; + let is_stream_terminator = maybe_block.is_none(); if let Some(batch_id) = self .network .backfill_sync_only_blocks_response(id, is_stream_terminator) @@ -735,7 +769,7 @@ impl SyncManager { batch_id, &peer_id, id, - beacon_block.map(|block| block.into()), + maybe_block.map(|block| block.into()), ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), Ok(ProcessResult::Successful) => {} @@ -748,7 +782,11 @@ impl SyncManager { } } RequestId::RangeBlocks { id } => { - let is_stream_terminator = beacon_block.is_none(); + let maybe_block = match block_or_blob { + BlockOrBlob::Block(maybe_block) => maybe_block, + BlockOrBlob::Sidecar(_) => todo!("I think this should be unreachable, since this is a range only-blocks request, and the network should not accept this chunk at all. Needs better handling"), + }; + let is_stream_terminator = maybe_block.is_none(); if let Some((chain_id, batch_id)) = self .network .range_sync_block_response(id, is_stream_terminator) @@ -759,28 +797,28 @@ impl SyncManager { chain_id, batch_id, id, - beacon_block.map(|block| block.into()), + maybe_block.map(|block| block.into()), ); self.update_sync_state(); } } RequestId::BackFillBlobs { id } => { - self.blobs_backfill_response(id, peer_id, beacon_block.into()) + self.backfill_block_and_blobs_response(id, peer_id, block_or_blob) } RequestId::RangeBlobs { id } => { - self.blobs_range_response(id, peer_id, beacon_block.into()) + self.range_block_and_blobs_response(id, peer_id, block_or_blob) } } } /// Handles receiving a response for a range sync request that should have both blocks and /// blobs. - fn blobs_range_response( + fn range_block_and_blobs_response( &mut self, id: Id, peer_id: PeerId, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) { if let Some((chain_id, resp)) = self .network @@ -822,11 +860,11 @@ impl SyncManager { /// Handles receiving a response for a Backfill sync request that should have both blocks and /// blobs. - fn blobs_backfill_response( + fn backfill_block_and_blobs_response( &mut self, id: Id, peer_id: PeerId, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) { if let Some(resp) = self .network @@ -871,32 +909,6 @@ impl SyncManager { } } } - - fn rpc_blobs_received( - &mut self, - request_id: RequestId, - _peer_id: PeerId, - _maybe_blob: Option::EthSpec>>>, - _seen_timestamp: Duration, - ) { - match request_id { - RequestId::SingleBlock { .. } | RequestId::ParentLookup { .. } => { - unreachable!("There is no such thing as a singular 'by root' glob request that is not accompanied by the block") - } - RequestId::BackFillBlocks { .. } => { - unreachable!("An only blocks request does not receive sidecars") - } - RequestId::BackFillBlobs { .. } => { - unimplemented!("Adjust backfill sync"); - } - RequestId::RangeBlocks { .. } => { - unreachable!("Only-blocks range requests don't receive sidecars") - } - RequestId::RangeBlobs { id: _ } => { - unimplemented!("Adjust range"); - } - } - } } impl From>> for BlockProcessResult { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 10f7f32955..974d8dbd8c 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -75,20 +75,20 @@ pub struct SyncNetworkContext { } /// Small enumeration to make dealing with block and blob requests easier. -pub enum BlockOrBlobs { +pub enum BlockOrBlob { Block(Option>>), - Blobs(Option>>), + Sidecar(Option>>), } -impl From>>> for BlockOrBlobs { +impl From>>> for BlockOrBlob { fn from(block: Option>>) -> Self { - BlockOrBlobs::Block(block) + BlockOrBlob::Block(block) } } -impl From>>> for BlockOrBlobs { +impl From>>> for BlockOrBlob { fn from(blob: Option>>) -> Self { - BlockOrBlobs::Blobs(blob) + BlockOrBlob::Sidecar(blob) } } @@ -311,15 +311,15 @@ impl SyncNetworkContext { pub fn range_sync_block_and_blob_response( &mut self, request_id: Id, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) -> Option<(ChainId, BlocksAndBlobsByRangeResponse)> { match self.range_blocks_and_blobs_requests.entry(request_id) { Entry::Occupied(mut entry) => { let req = entry.get_mut(); let info = &mut req.block_blob_info; match block_or_blob { - BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), + BlockOrBlob::Sidecar(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything @@ -402,14 +402,14 @@ impl SyncNetworkContext { pub fn backfill_sync_block_and_blob_response( &mut self, request_id: Id, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) -> Option> { match self.backfill_blocks_and_blobs_requests.entry(request_id) { Entry::Occupied(mut entry) => { let (_, info) = entry.get_mut(); match block_or_blob { - BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), + BlockOrBlob::Sidecar(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything