From 38e0994dc46855e352e025ee6bdd0295b96c53cd Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 4 Apr 2023 12:38:01 -0400 Subject: [PATCH] make single block lookup generic --- beacon_node/beacon_chain/src/beacon_chain.rs | 8 +- .../src/data_availability_checker.rs | 9 +- beacon_node/http_api/src/publish_blocks.rs | 2 +- .../beacon_processor/worker/gossip_methods.rs | 6 +- .../network/src/sync/block_lookups/mod.rs | 48 ++++---- .../src/sync/block_lookups/parent_lookup.rs | 10 +- .../sync/block_lookups/single_block_lookup.rs | 113 ++++++++++++------ beacon_node/network/src/sync/manager.rs | 3 + 8 files changed, 125 insertions(+), 74 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4986f6251a..5a6c043933 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -190,7 +190,7 @@ pub enum WhenSlotSkipped { #[derive(Debug, PartialEq)] pub enum AvailabilityProcessingStatus { - PendingBlobs(Vec), + PendingBlobs(Hash256, Vec), PendingBlock(Hash256), Imported(Hash256), } @@ -2631,7 +2631,7 @@ impl BeaconChain { AvailabilityProcessingStatus::Imported(_) => { // The block was imported successfully. } - AvailabilityProcessingStatus::PendingBlobs(blobs) => {} + AvailabilityProcessingStatus::PendingBlobs(block_root, blobs) => {} AvailabilityProcessingStatus::PendingBlock(_) => { // doesn't makes sense } @@ -2880,8 +2880,8 @@ impl BeaconChain { Availability::PendingBlock(block_root) => { Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) } - Availability::PendingBlobs(blob_ids) => { - Ok(AvailabilityProcessingStatus::PendingBlobs(blob_ids)) + Availability::PendingBlobs(block_root, blob_ids) => { + Ok(AvailabilityProcessingStatus::PendingBlobs(block_root, blob_ids)) } } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 69fd6da4db..96d8ae3810 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -120,7 +120,7 @@ impl ReceivedComponents { /// Indicates if the block is fully `Available` or if we need blobs or blocks /// to "complete" the requirements for an `AvailableBlock`. pub enum Availability { - PendingBlobs(Vec), + PendingBlobs(Hash256, Vec), PendingBlock(Hash256), Available(Box>), } @@ -254,8 +254,9 @@ impl DataAvailabilityChecker { } Entry::Vacant(vacant_entry) => { let all_blob_ids = executed_block.get_all_blob_ids(); + let block_root = executed_block.import_data.block_root; vacant_entry.insert(ReceivedComponents::new_from_block(executed_block)); - Availability::PendingBlobs(all_blob_ids) + Availability::PendingBlobs(block_root, all_blob_ids) } }; @@ -312,9 +313,11 @@ impl DataAvailabilityChecker { .unwrap_or(true) }); + let block_root = executed_block.import_data.block_root; + let _ = received_components.executed_block.insert(executed_block); - Ok(Availability::PendingBlobs(missing_blob_ids)) + Ok(Availability::PendingBlobs(block_root, missing_blob_ids)) } } diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index d722cf6c9b..5c759cdb2a 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -145,7 +145,7 @@ pub async fn publish_block( ); Err(warp_utils::reject::broadcast_without_import(msg)) } - Ok(AvailabilityProcessingStatus::PendingBlobs(blob_ids)) => { + Ok(AvailabilityProcessingStatus::PendingBlobs(_, blob_ids)) => { let msg = format!("Missing blobs {:?}", blob_ids); error!( log, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 431a878cae..141c958676 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -691,9 +691,10 @@ impl Worker { // add to metrics // logging } - Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => self + Ok(AvailabilityProcessingStatus::PendingBlobs(block_root, pending_blobs)) => self .send_sync_message(SyncMessage::MissingBlobs { peer_id, + block_root, pending_blobs, search_delay: Duration::from_secs(0), //TODO(sean) update }), @@ -1064,10 +1065,11 @@ impl Worker { "block_root" => %block_root ); } - Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => { + Ok(AvailabilityProcessingStatus::PendingBlobs(block_rooot, pending_blobs)) => { // make rpc request for blob self.send_sync_message(SyncMessage::MissingBlobs { peer_id, + block_root, pending_blobs, search_delay: Duration::from_secs(0), //TODO(sean) update }); diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index a531d42603..66410fd360 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -8,6 +8,7 @@ use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use fnv::FnvHashMap; +use itertools::Itertools; use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; @@ -136,23 +137,19 @@ impl BlockLookups { pub fn search_blobs( &mut self, + block_root: Hash256, blob_ids: Vec, peer_id: PeerId, cx: &mut SyncNetworkContext, ) { - todo!() - - // - // let hash = Hash256::zero(); - // - // // Do not re-request a blo that is already being requested - // if self - // .single_blob_lookups - // .values_mut() - // .any(|single_block_request| single_block_request.add_peer(&hash, &peer_id)) - // { - // return; - // } + // Do not re-request blobs that are already being requested + if self + .single_blob_lookups + .values_mut() + .any(|single_block_request| single_block_request.add_peer(&blob_ids, &peer_id)) + { + return; + } // // if self.parent_lookups.iter_mut().any(|parent_req| { // parent_req.add_peer(&hash, &peer_id) || parent_req.contains_block(&hash) @@ -208,12 +205,13 @@ impl BlockLookups { pub fn search_blobs_delayed( &mut self, peer_id: PeerId, + block_root: Hash256, blob_ids: Vec, delay: Duration, cx: &mut SyncNetworkContext, ) { //TODO(sean) handle delay - self.search_blobs(blob_ids, peer_id, cx); + self.search_blobs(block_root, blob_ids, peer_id, cx); } /// If a block is attempted to be processed but we do not know its parent, this function is @@ -314,7 +312,7 @@ impl BlockLookups { let mut req = request.remove(); debug!(self.log, "Single block lookup failed"; - "peer_id" => %peer_id, "error" => msg, "block_root" => %req.hash); + "peer_id" => %peer_id, "error" => msg, "block_root" => %req.requested_thing); // try the request again if possible if let Ok((peer_id, request)) = req.request_block() { if let Ok(id) = cx.single_block_lookup_request(peer_id, request) { @@ -469,7 +467,7 @@ impl BlockLookups { trace!( self.log, "Single block request failed on peer disconnection"; - "block_root" => %req.hash, + "block_root" => %req.requested_thing, "peer_id" => %peer_id, "reason" => <&str>::from(e), ); @@ -519,7 +517,7 @@ impl BlockLookups { pub fn single_block_lookup_failed(&mut self, id: Id, cx: &mut SyncNetworkContext) { if let Some(mut request) = self.single_block_lookups.remove(&id) { request.register_failure_downloading(); - trace!(self.log, "Single block lookup failed"; "block" => %request.hash); + trace!(self.log, "Single block lookup failed"; "block" => %request.requested_thing); if let Ok((peer_id, block_request)) = request.request_block() { if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) { self.single_block_lookups.insert(request_id, request); @@ -551,7 +549,7 @@ impl BlockLookups { } }; - let root = req.hash; + let root = req.requested_thing; let peer_id = match req.processing_peer() { Ok(peer) => peer, Err(_) => return, @@ -562,8 +560,8 @@ impl BlockLookups { AvailabilityProcessingStatus::Imported(hash) => { trace!(self.log, "Single block processing succeeded"; "block" => %root); } - AvailabilityProcessingStatus::PendingBlobs(blobs_ids) => { - self.search_blobs(blobs_ids, peer_id, cx); + AvailabilityProcessingStatus::PendingBlobs(block_root, blobs_ids) => { + self.search_blobs(block_root, blobs_ids, peer_id, cx); } AvailabilityProcessingStatus::PendingBlock(hash) => { warn!(self.log, "Block processed but returned PendingBlock"; "block" => %hash); @@ -654,7 +652,7 @@ impl BlockLookups { AvailabilityProcessingStatus::Imported(hash) => { trace!(self.log, "Parent block processing succeeded"; &parent_lookup) } - AvailabilityProcessingStatus::PendingBlobs(blobs) => { + AvailabilityProcessingStatus::PendingBlobs(block_root, blobs) => { // trigger? } AvailabilityProcessingStatus::PendingBlock(hash) => { @@ -679,8 +677,8 @@ impl BlockLookups { BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlock(_)) => { // doesn't make sense } - BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlobs(blobs_ids)) => { - self.search_blobs(blobs_ids, peer_id, cx); + BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlobs(block_root, blobs_ids)) => { + self.search_blobs(block_root, blobs_ids, peer_id, cx); } BlockProcessResult::Err(BlockError::ParentUnknown(block)) => { // TODO(sean) how do we handle this erroring? @@ -689,8 +687,10 @@ impl BlockLookups { .data_availability_checker .get_missing_blob_ids(block.clone(), None) .unwrap_or_default(); + if let Some(block_root) = missing_ids.first().map(|first_id| first_id.block_root){ + self.search_blobs(block_root, missing_ids, peer_id, cx); + } parent_lookup.add_block(block); - self.search_blobs(missing_ids, peer_id, cx); self.request_parent(parent_lookup, cx); } BlockProcessResult::Ok(AvailabilityProcessingStatus::Imported(_)) diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 5d669ae1e4..c9b49c1907 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -11,6 +11,7 @@ use std::sync::Arc; use store::Hash256; use strum::IntoStaticStr; use types::{BlobSidecar, SignedBeaconBlock}; +use crate::sync::block_lookups::single_block_lookup::SingleBlobRequest; use super::single_block_lookup::{self, SingleBlockRequest}; @@ -30,6 +31,7 @@ pub(crate) struct ParentLookup { downloaded_blobs: Vec>>>>, /// Request of the last parent. current_parent_request: SingleBlockRequest, + current_parent_blobs_request: SingleBlobRequest, /// Id of the last parent request. current_parent_request_id: Option, } @@ -69,12 +71,14 @@ impl ParentLookup { ) -> Self { let (block, blobs) = block_wrapper.deconstruct(); let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id); + let current_parent_blobs_request = todo!(); Self { chain_hash: block_root, downloaded_blocks: vec![(block_root, block)], downloaded_blobs: vec![blobs], current_parent_request, + current_parent_blobs_request, current_parent_request_id: None, } } @@ -105,11 +109,11 @@ impl ParentLookup { pub fn add_block(&mut self, block_wrapper: BlockWrapper) { let next_parent = block_wrapper.parent_root(); - let current_root = self.current_parent_request.hash; + let current_root = self.current_parent_request.requested_thing; let (block, blobs) = block_wrapper.deconstruct(); self.downloaded_blocks.push((current_root, block)); self.downloaded_blobs.push(blobs); - self.current_parent_request.hash = next_parent; + self.current_parent_request.requested_thing = next_parent; self.current_parent_request.state = single_block_lookup::State::AwaitingDownload; self.current_parent_request_id = None; } @@ -133,7 +137,7 @@ impl ParentLookup { downloaded_blocks, downloaded_blobs, current_parent_request, - current_parent_request_id: _, + current_parent_blobs_request, current_parent_request_id: _, } = self; let block_count = downloaded_blocks.len(); let mut blocks = Vec::with_capacity(block_count); diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index d486f4ba78..f429ee8edc 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -2,23 +2,27 @@ use super::RootBlockTuple; use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::get_block_root; -use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; +use lighthouse_network::{rpc::BlocksByRootRequest, PeerId, Request}; use rand::seq::IteratorRandom; use ssz_types::VariableList; use std::collections::HashSet; use std::sync::Arc; use store::{EthSpec, Hash256}; use strum::IntoStaticStr; +use lighthouse_network::rpc::methods::BlobsByRootRequest; use types::blob_sidecar::BlobIdentifier; -use types::SignedBeaconBlock; +use types::{BlobSidecar, SignedBeaconBlock}; + +pub type SingleBlockRequest = SingleLookupRequest; +pub type SingleBlobRequest = SingleLookupRequest>; /// Object representing a single block lookup request. /// //previously assumed we would have a single block. Now we may have the block but not the blobs #[derive(PartialEq, Eq)] -pub struct SingleBlockRequest { +pub struct SingleLookupRequest { /// The hash of the requested block. - pub hash: Hash256, + pub requested_thing: T, /// State of this request. pub state: State, /// Peers that should have this block. @@ -31,21 +35,60 @@ pub struct SingleBlockRequest { failed_downloading: u8, } -#[derive(PartialEq, Eq)] -pub struct SingleBlobRequest { - /// The hash of the requested block. - pub hash: Hash256, - pub blob_ids: Vec, - /// State of this request. - pub state: State, - /// Peers that should have this block. - pub available_peers: HashSet, - /// Peers from which we have requested this block. - pub used_peers: HashSet, - /// How many times have we attempted to process this block. - failed_processing: u8, - /// How many times have we attempted to download this block. - failed_downloading: u8, +pub trait RequestableThing { + type Request; + type Response; + type WrappedResponse; + fn verify_response(&self, response: &Self::Response) -> bool; + fn make_request(&self) -> Self::Request; + fn wrapped_response(&self, response: Self::Response) -> Self::WrappedResponse; + fn is_useful(&self, other: &Self) -> bool; +} + +impl RequestableThing for Hash256 { + type Request = BlocksByRootRequest; + type Response = Arc>; + type WrappedResponse = RootBlockTuple; + fn verify_response(&self, response: &Self::Response) -> bool{ + // Compute the block root using this specific function so that we can get timing + // metrics. + let block_root = get_block_root(response); + *self == block_root + } + fn make_request(&self) -> Self::Request{ + let request = BlocksByRootRequest { + block_roots: VariableList::from(vec![*self]), + }; + request + } + fn wrapped_response(&self, response: Self::Response) -> Self::WrappedResponse { + (*self, response) + } + + fn is_useful(&self, other: &Self) -> bool { + self == other + } +} + +impl RequestableThing for Vec{ + type Request = BlobsByRootRequest; + type Response = Arc>; + type WrappedResponse = Arc>; + + fn verify_response(&self, response: &Self::Response) -> bool{ + true + } + fn make_request(&self) -> Self::Request{ + todo!() + } + + fn wrapped_response(&self, response: Self::Response) -> Self::WrappedResponse { + response + } + + fn is_useful(&self, other: &Self) -> bool { + todo!() + } } #[derive(Debug, PartialEq, Eq)] @@ -72,10 +115,10 @@ pub enum LookupRequestError { NoPeers, } -impl SingleBlockRequest { - pub fn new(hash: Hash256, peer_id: PeerId) -> Self { +impl SingleLookupRequest { + pub fn new(requested_thing: T, peer_id: PeerId) -> Self { Self { - hash, + requested_thing, state: State::AwaitingDownload, available_peers: HashSet::from([peer_id]), used_peers: HashSet::default(), @@ -102,8 +145,8 @@ impl SingleBlockRequest { self.failed_processing + self.failed_downloading } - pub fn add_peer(&mut self, hash: &Hash256, peer_id: &PeerId) -> bool { - let is_useful = &self.hash == hash; + pub fn add_peer(&mut self, requested_thing: &T, peer_id: &PeerId) -> bool { + let is_useful = self.requested_thing.is_useful(requested_thing); if is_useful { self.available_peers.insert(*peer_id); } @@ -125,10 +168,10 @@ impl SingleBlockRequest { /// Verifies if the received block matches the requested one. /// Returns the block for processing if the response is what we expected. - pub fn verify_block( + pub fn verify_block( &mut self, - block: Option>>, - ) -> Result>, VerifyError> { + block: Option>, + ) -> Result>, VerifyError> { match self.state { State::AwaitingDownload => { self.register_failure_downloading(); @@ -136,10 +179,7 @@ impl SingleBlockRequest { } State::Downloading { peer_id } => match block { Some(block) => { - // Compute the block root using this specific function so that we can get timing - // metrics. - let block_root = get_block_root(&block); - if block_root != self.hash { + if self.requested_thing.verify_response(&block) { // return an error and drop the block // NOTE: we take this is as a download failure to prevent counting the // attempt as a chain failure, but simply a peer failure. @@ -148,7 +188,7 @@ impl SingleBlockRequest { } else { // Return the block for processing. self.state = State::Processing { peer_id }; - Ok(Some((block_root, block))) + Ok(Some(self.requested_thing.wrapped_response(block))) } } None => { @@ -171,16 +211,15 @@ impl SingleBlockRequest { } } - pub fn request_block(&mut self) -> Result<(PeerId, BlocksByRootRequest), LookupRequestError> { + pub fn request_block(&mut self) -> Result<(PeerId, T::Request), LookupRequestError> { debug_assert!(matches!(self.state, State::AwaitingDownload)); if self.failed_attempts() >= MAX_ATTEMPTS { Err(LookupRequestError::TooManyAttempts { cannot_process: self.failed_processing >= self.failed_downloading, }) } else if let Some(&peer_id) = self.available_peers.iter().choose(&mut rand::thread_rng()) { - let request = BlocksByRootRequest { - block_roots: VariableList::from(vec![self.hash]), - }; + let request = self.requested_thing.make_request(); + self.state = State::Downloading { peer_id }; self.used_peers.insert(peer_id); Ok((peer_id, request)) @@ -206,7 +245,7 @@ impl slog::Value for SingleBlockRequest { serializer: &mut dyn slog::Serializer, ) -> slog::Result { serializer.emit_str("request", key)?; - serializer.emit_arguments("hash", &format_args!("{}", self.hash))?; + serializer.emit_arguments("hash", &format_args!("{}", self.requested_thing))?; match &self.state { State::AwaitingDownload => { "awaiting_download".serialize(record, "state", serializer)? diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 70bd419413..ba64dafc87 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -132,6 +132,7 @@ pub enum SyncMessage { /// delay expires. MissingBlobs { peer_id: PeerId, + block_root: Hash256, pending_blobs: Vec, search_delay: Duration, }, @@ -632,6 +633,7 @@ impl SyncManager { } SyncMessage::MissingBlobs { peer_id, + block_root, pending_blobs, search_delay, } => { @@ -639,6 +641,7 @@ impl SyncManager { if self.synced_and_connected(&peer_id) { self.block_lookups.search_blobs_delayed( peer_id, + block_root, pending_blobs, search_delay, &mut self.network,