From a17405ae206caec696660499a0bdc7e4e648fffe Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 31 Mar 2023 10:45:52 -0400 Subject: [PATCH] impl logic for triggerng blob lookups along with block lookups --- .../network/src/sync/network_context.rs | 115 ++++++++++++------ 1 file changed, 80 insertions(+), 35 deletions(-) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d6bab8729a..42b3279e85 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -4,19 +4,22 @@ use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use super::manager::{Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; +use crate::beacon_processor::Work::BlobsByRootsRequest; use crate::beacon_processor::WorkEvent; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::BlobsByRangeRequest; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use slog::{debug, trace, warn}; +use ssz_types::VariableList; use std::collections::hash_map::Entry; use std::sync::Arc; use tokio::sync::mpsc; +use types::blob_sidecar::BlobIdentifier; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; pub struct BlocksAndBlobsByRangeResponse { @@ -414,34 +417,56 @@ impl SyncNetworkContext { peer_id: PeerId, request: BlocksByRootRequest, ) -> Result { - let request = if self + let id = self.next_id(); + let request_id = RequestId::Sync(SyncRequestId::SingleBlock { id }); + if self .chain .is_data_availability_check_required() .map_err(|_| "Unable to read slot clock")? { + let mut blob_ids = + VariableList::new(vec![]).map_err(|_| "could not create blob request list")?; + for block_root in request.block_roots.iter() { + // Request all blobs because we have no way of knowing how many blobs exist for a block + // until we have the block. + let blobs_per_block = T::EthSpec::max_blobs_per_block(); + for i in 0..blobs_per_block { + blob_ids + .push(BlobIdentifier { + block_root: *block_root, + index: i as u64, + }) + .map_err(|_| "too many blobs in by root request")?; + } + } + + let blobs_count = blob_ids.len(); + let blob_request = Request::BlobsByRoot(BlobsByRootRequest { blob_ids }); + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: blob_request, + request_id, + })?; trace!( self.log, "Sending BlobsByRoot Request"; "method" => "BlobsByRoot", - "count" => request.block_roots.len(), + "count" => blobs_count, "peer" => %peer_id ); - unimplemented!("There is no longer such thing as a single block lookup, since we nede to ask for blobs and blocks separetely"); - } else { - trace!( - self.log, - "Sending BlocksByRoot Request"; - "method" => "BlocksByRoot", - "count" => request.block_roots.len(), - "peer" => %peer_id - ); - Request::BlocksByRoot(request) - }; - let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::SingleBlock { id }); + } + + trace!( + self.log, + "Sending BlocksByRoot Request"; + "method" => "BlocksByRoot", + "count" => request.block_roots.len(), + "peer" => %peer_id + ); + self.send_network_msg(NetworkMessage::SendRequest { peer_id, - request, + request: Request::BlocksByRoot(request), request_id, })?; Ok(id) @@ -453,36 +478,56 @@ impl SyncNetworkContext { peer_id: PeerId, request: BlocksByRootRequest, ) -> Result { - let request = if self + let id = self.next_id(); + let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id }); + if self .chain .is_data_availability_check_required() .map_err(|_| "Unable to read slot clock")? { + let mut blob_ids = + VariableList::new(vec![]).map_err(|e| "could not create blob request list")?; + for block_root in request.block_roots.iter() { + // Request all blobs because we have no way of knowing how many blobs exist for a block + // until we have the block. + let blobs_per_block = T::EthSpec::max_blobs_per_block(); + for i in 0..blobs_per_block { + blob_ids + .push(BlobIdentifier { + block_root: *block_root, + index: i as u64, + }) + .map_err(|_| "too many blobs in by root request")?; + } + } + + let blobs_count = blob_ids.len(); + let blob_request = Request::BlobsByRoot(BlobsByRootRequest { blob_ids }); + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: blob_request, + request_id, + })?; trace!( self.log, "Sending BlobsByRoot Request"; "method" => "BlobsByRoot", - "count" => request.block_roots.len(), + "count" => blobs_count, "peer" => %peer_id ); - unimplemented!( - "Parent requests now need to interleave blocks and blobs or something like that." - ) - } else { - trace!( - self.log, - "Sending BlocksByRoot Request"; - "method" => "BlocksByRoot", - "count" => request.block_roots.len(), - "peer" => %peer_id - ); - Request::BlocksByRoot(request) - }; - let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id }); + } + + trace!( + self.log, + "Sending BlocksByRoot Request"; + "method" => "BlocksByRoot", + "count" => request.block_roots.len(), + "peer" => %peer_id + ); + self.send_network_msg(NetworkMessage::SendRequest { peer_id, - request, + request: Request::BlocksByRoot(request), request_id, })?; Ok(id)