From 2c9477de43e4ecc011158d1cfb3a59707f7b6100 Mon Sep 17 00:00:00 2001 From: Divma <26765164+divagant-martian@users.noreply.github.com> Date: Wed, 15 Mar 2023 11:04:45 -0500 Subject: [PATCH] Fix block and blob coupling in the network context (#4086) * update docs * introduce a temp enum to model an adjusted `BlockWrapper` and fix blob coupling * fix compilation issue * fix blob coupling in the network context * review comments --- .../src/sync/block_sidecar_coupling.rs | 54 ++++++++-------- beacon_node/network/src/sync/manager.rs | 4 +- .../network/src/sync/network_context.rs | 64 ++++++++++++++----- 3 files changed, 80 insertions(+), 42 deletions(-) diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 886c903970..438317d1cd 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,14 +1,13 @@ -use beacon_chain::blob_verification::BlockWrapper; +use super::network_context::TempBlockWrapper; use std::{collections::VecDeque, sync::Arc}; - -use types::{BlobsSidecar, EthSpec, SignedBeaconBlock}; +use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; #[derive(Debug, Default)] pub struct BlocksAndBlobsRequestInfo { /// Blocks we have received awaiting for their corresponding sidecar. accumulated_blocks: VecDeque>>, /// Sidecars we have received awaiting for their corresponding block. - accumulated_sidecars: VecDeque>>, + accumulated_sidecars: VecDeque>>, /// Whether the individual RPC request for blocks is finished or not. is_blocks_stream_terminated: bool, /// Whether the individual RPC request for sidecars is finished or not. @@ -23,14 +22,14 @@ impl BlocksAndBlobsRequestInfo { } } - pub fn add_sidecar_response(&mut self, maybe_sidecar: Option>>) { + pub fn add_sidecar_response(&mut self, maybe_sidecar: Option>>) { match maybe_sidecar { Some(sidecar) => self.accumulated_sidecars.push_back(sidecar), None => self.is_sidecars_stream_terminated = true, } } - pub fn into_responses(self) -> Result>, &'static str> { + pub fn into_responses(self) -> Result>, &'static str> { let BlocksAndBlobsRequestInfo { accumulated_blocks, mut accumulated_sidecars, @@ -39,28 +38,33 @@ impl BlocksAndBlobsRequestInfo { // ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any blob (empty // included) for a skipped slot is not permitted. - let pairs = accumulated_blocks - .into_iter() - .map(|beacon_block| { - if accumulated_sidecars - .front() - .map(|sidecar| sidecar.beacon_block_slot == beacon_block.slot()) - .unwrap_or(false) - { - let blobs_sidecar = accumulated_sidecars.pop_front(); - BlockWrapper::new(beacon_block, blobs_sidecar) - } else { - BlockWrapper::new(beacon_block, None) - } - }) - .collect::>(); + let mut responses = Vec::with_capacity(accumulated_blocks.len()); + let mut blob_iter = accumulated_sidecars.into_iter().peekable(); + for block in accumulated_blocks.into_iter() { + let mut blob_list = Vec::with_capacity(T::max_blobs_per_block()); + while { + let pair_next_blob = blob_iter + .peek() + .map(|sidecar| sidecar.slot == block.slot()) + .unwrap_or(false); + pair_next_blob + } { + blob_list.push(blob_iter.next().expect("iterator is not empty")); + } - // if accumulated sidecars is not empty, throw an error. - if !accumulated_sidecars.is_empty() { - return Err("Received more sidecars than blocks"); + if blob_list.is_empty() { + responses.push(TempBlockWrapper::Block(block)) + } else { + responses.push(TempBlockWrapper::BlockAndBlobList(block, blob_list)) + } } - Ok(pairs) + // if accumulated sidecars is not empty, throw an error. + if blob_iter.next().is_some() { + return Err("Received sidecars that don't pair well"); + } + + Ok(responses) } pub fn is_finished(&self) -> bool { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 2c8e0f047f..353d3e896e 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -78,11 +78,11 @@ pub enum RequestId { ParentLookup { id: Id }, /// Request was from the backfill sync algorithm. BackFillBlocks { id: Id }, - /// Backfill request for blob sidecars. + /// Backfill request that is composed by both a block range request and a blob range request. BackFillBlobs { id: Id }, /// The request was from a chain in the range sync algorithm. RangeBlocks { id: Id }, - /// The request was from a chain in range, asking for ranges blob sidecars. + /// Range request that is composed by both a block range request and a blob range request. RangeBlobs { id: Id }, } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 2da6a41e24..10f7f32955 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -18,7 +18,13 @@ use slog::{debug, trace, warn}; use std::collections::hash_map::Entry; use std::sync::Arc; use tokio::sync::mpsc; -use types::{BlobsSidecar, EthSpec, SignedBeaconBlock}; +use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; + +// Temporary struct to handle incremental changes in the meantime. +pub enum TempBlockWrapper { + Block(Arc>), + BlockAndBlobList(Arc>, Vec>>), +} pub struct BlocksAndBlobsByRangeResponse { pub batch_id: BatchId, @@ -71,7 +77,7 @@ pub struct SyncNetworkContext { /// Small enumeration to make dealing with block and blob requests easier. pub enum BlockOrBlobs { Block(Option>>), - Blobs(Option>>), + Blobs(Option>>), } impl From>>> for BlockOrBlobs { @@ -80,8 +86,8 @@ impl From>>> for BlockOrBlobs { } } -impl From>>> for BlockOrBlobs { - fn from(blob: Option>>) -> Self { +impl From>>> for BlockOrBlobs { + fn from(blob: Option>>) -> Self { BlockOrBlobs::Blobs(blob) } } @@ -323,13 +329,25 @@ impl SyncNetworkContext { block_blob_info, } = entry.remove(); - Some(( - chain_id, - BlocksAndBlobsByRangeResponse { - batch_id, - responses: block_blob_info.into_responses(), - }, - )) + let responses = block_blob_info.into_responses(); + let unimplemented_info = match responses { + Ok(responses) => { + let infos = responses + .into_iter() + .map(|temp_block_wrapper| match temp_block_wrapper { + TempBlockWrapper::Block(block) => { + format!("slot{}", block.slot()) + } + TempBlockWrapper::BlockAndBlobList(block, blob_list) => { + format!("slot{}({} blobs)", block.slot(), blob_list.len()) + } + }) + .collect::>(); + infos.join(", ") + } + Err(e) => format!("Error: {e}"), + }; + unimplemented!("Here we are supposed to return a block possibly paired with a Bundle of blobs, but only have a list of individual blobs. This is what we got from the network: ChainId[{chain_id}] BatchId[{batch_id}] {unimplemented_info}") } else { None } @@ -396,10 +414,26 @@ impl SyncNetworkContext { if info.is_finished() { // If the request is finished, dequeue everything let (batch_id, info) = entry.remove(); - Some(BlocksAndBlobsByRangeResponse { - batch_id, - responses: info.into_responses(), - }) + + let responses = info.into_responses(); + let unimplemented_info = match responses { + Ok(responses) => { + let infos = responses + .into_iter() + .map(|temp_block_wrapper| match temp_block_wrapper { + TempBlockWrapper::Block(block) => { + format!("slot{}", block.slot()) + } + TempBlockWrapper::BlockAndBlobList(block, blob_list) => { + format!("slot{}({} blobs)", block.slot(), blob_list.len()) + } + }) + .collect::>(); + infos.join(", ") + } + Err(e) => format!("Error: {e}"), + }; + unimplemented!("Here we are supposed to return a block possibly paired with a Bundle of blobs for backfill, but only have a list of individual blobs. This is what we got from the network: BatchId[{batch_id}]{unimplemented_info}") } else { None }