From 8d80200bc4aaf5613642c057ef942ac625b387c4 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 28 Mar 2023 18:29:56 -0400 Subject: [PATCH] some blob reprocessing work --- .../beacon_processor/worker/gossip_methods.rs | 17 ++++- .../beacon_processor/worker/sync_methods.rs | 3 +- .../network/src/sync/block_lookups/mod.rs | 42 +++++++++++ beacon_node/network/src/sync/manager.rs | 73 +++++++++++++++---- 4 files changed, 115 insertions(+), 20 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 69e167b4d0..431a878cae 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -692,12 +692,17 @@ impl Worker { // logging } Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => self - .send_sync_message(SyncMessage::UnknownBlobHash { + .send_sync_message(SyncMessage::MissingBlobs { peer_id, pending_blobs, + search_delay: Duration::from_secs(0), //TODO(sean) update }), Ok(AvailabilityProcessingStatus::PendingBlock(block_hash)) => { - self.send_sync_message(SyncMessage::UnknownBlockHash(peer_id, block_hash)); + self.send_sync_message(SyncMessage::UnknownBlockHashFromGossipBlob( + peer_id, + block_hash, + Duration::from_secs(0), + )); //TODO(sean) update } Err(_err) => { // handle errors @@ -1061,9 +1066,10 @@ impl Worker { } Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => { // make rpc request for blob - self.send_sync_message(SyncMessage::UnknownBlobHash { + self.send_sync_message(SyncMessage::MissingBlobs { peer_id, pending_blobs, + search_delay: Duration::from_secs(0), //TODO(sean) update }); } Err(BlockError::AvailabilityCheck(_)) => { @@ -1902,7 +1908,10 @@ impl Worker { // We don't know the block, get the sync manager to handle the block lookup, and // send the attestation to be scheduled for re-processing. self.sync_tx - .send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root)) + .send(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, + *beacon_block_root, + )) .unwrap_or_else(|_| { warn!( self.log, diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 09e98cb183..e718a01d64 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -101,7 +101,8 @@ impl Worker { metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); // RPC block imported, regardless of process type - //TODO(sean) handle pending availability variants + //TODO(sean) do we need to do anything here for missing blobs? or is passing the result + // along to sync enough? if let &Ok(AvailabilityProcessingStatus::Imported(hash)) = &result { info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash); diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 77e659a268..8f7b9d4549 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,5 +1,6 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::thread::sleep; use std::time::Duration; use beacon_chain::blob_verification::AsBlock; @@ -12,6 +13,7 @@ use lru_cache::LRUTimeCache; use slog::{debug, error, trace, warn, Logger}; use smallvec::SmallVec; use store::Hash256; +use types::blob_sidecar::BlobIdentifier; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; @@ -133,6 +135,37 @@ impl BlockLookups { } } + pub fn search_blobs( + &mut self, + blob_ids: Vec, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + ) { + todo!() + } + + pub fn search_block_delayed( + &mut self, + peer_id: PeerId, + hash: Hash256, + delay: Duration, + cx: &mut SyncNetworkContext, + ) { + //TODO(sean) handle delay + self.search_block(hash, peer_id, cx); + } + + pub fn search_blobs_delayed( + &mut self, + peer_id: PeerId, + blob_ids: Vec, + delay: Duration, + cx: &mut SyncNetworkContext, + ) { + //TODO(sean) handle delay + self.search_blobs(blob_ids, peer_id, cx); + } + /// If a block is attempted to be processed but we do not know its parent, this function is /// called in order to find the block's parent. pub fn search_parent( @@ -460,6 +493,9 @@ impl BlockLookups { BlockProcessResult::Ok => { trace!(self.log, "Single block processing succeeded"; "block" => %root); } + BlockProcessResult::MissingBlobs(blobs) => { + todo!() + } BlockProcessResult::Ignored => { // Beacon processor signalled to ignore the block processing result. // This implies that the cpu is overloaded. Drop the request. @@ -543,6 +579,9 @@ impl BlockLookups { BlockProcessResult::Ok => { trace!(self.log, "Parent block processing succeeded"; &parent_lookup) } + BlockProcessResult::MissingBlobs(blobs) => { + todo!() + } BlockProcessResult::Err(e) => { trace!(self.log, "Parent block processing failed"; &parent_lookup, "error" => %e) } @@ -557,6 +596,9 @@ impl BlockLookups { } match result { + BlockProcessResult::MissingBlobs(blobs) => { + todo!() + } BlockProcessResult::Err(BlockError::ParentUnknown(block)) => { // need to keep looking for parents // add the block back to the queue and continue the search diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 482bfab708..432c829ae1 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -44,7 +44,9 @@ use crate::status::ToStatusMessage; use crate::sync::range_sync::ByRangeRequestType; use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; -use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; +use beacon_chain::{ + AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState, +}; use futures::StreamExt; use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS; use lighthouse_network::rpc::RPCError; @@ -117,15 +119,21 @@ pub enum SyncMessage { /// A block with an unknown parent has been received. UnknownBlock(PeerId, BlockWrapper, Hash256), - /// A peer has sent an object that references a block that is unknown. This triggers the + /// A peer has sent an attestation that references a block that is unknown. This triggers the /// manager to attempt to find the block matching the unknown hash. - UnknownBlockHash(PeerId, Hash256), + UnknownBlockHashFromAttestation(PeerId, Hash256), + + /// A peer has sent a blob that references a block that is unknown. This triggers the + /// manager to attempt to find the block matching the unknown hash when the specified delay expires. + UnknownBlockHashFromGossipBlob(PeerId, Hash256, Duration), /// A peer has sent us a block that we haven't received all the blobs for. This triggers - /// the manager to attempt to find the pending blobs for the given block root. - UnknownBlobHash { + /// the manager to attempt to find the pending blobs for the given block root when the specified + /// delay expires. + MissingBlobs { peer_id: PeerId, pending_blobs: Vec, + search_delay: Duration, }, /// A peer has disconnected. @@ -161,6 +169,7 @@ pub enum BlockProcessType { #[derive(Debug)] pub enum BlockProcessResult { Ok, + MissingBlobs(Vec), Err(BlockError), Ignored, } @@ -597,18 +606,38 @@ impl SyncManager { .search_parent(block_root, block, peer_id, &mut self.network); } } - SyncMessage::UnknownBlockHash(peer_id, block_hash) => { + SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => { // If we are not synced, ignore this block. - if self.network_globals.sync_state.read().is_synced() - && self.network_globals.peers.read().is_connected(&peer_id) - && self.network.is_execution_engine_online() - { + if self.synced_and_connected(&peer_id) { self.block_lookups .search_block(block_hash, peer_id, &mut self.network); } } - SyncMessage::UnknownBlobHash { .. } => { - unimplemented!() + SyncMessage::UnknownBlockHashFromGossipBlob(peer_id, block_hash, delay) => { + // If we are not synced, ignore this block. + if self.synced_and_connected(&peer_id) { + self.block_lookups.search_block_delayed( + peer_id, + block_hash, + delay, + &mut self.network, + ); + } + } + SyncMessage::MissingBlobs { + peer_id, + pending_blobs, + search_delay, + } => { + // If we are not synced, ignore these blobs. + if self.synced_and_connected(&peer_id) { + self.block_lookups.search_blobs_delayed( + peer_id, + pending_blobs, + search_delay, + &mut self.network, + ); + } } SyncMessage::Disconnect(peer_id) => { self.peer_disconnect(&peer_id); @@ -673,6 +702,12 @@ impl SyncManager { } } + fn synced_and_connected(&mut self, peer_id: &PeerId) -> bool { + self.network_globals.sync_state.read().is_synced() + && self.network_globals.peers.read().is_connected(&peer_id) + && self.network.is_execution_engine_online() + } + fn handle_new_execution_engine_state(&mut self, engine_state: EngineState) { self.network.update_execution_engine_state(engine_state); @@ -923,10 +958,18 @@ impl SyncManager { } } -impl From>> for BlockProcessResult { - fn from(result: Result>) -> Self { +impl From>> + for BlockProcessResult +{ + fn from(result: Result>) -> Self { match result { - Ok(_) => BlockProcessResult::Ok, + Ok(AvailabilityProcessingStatus::Imported(_)) => BlockProcessResult::Ok, + Ok(AvailabilityProcessingStatus::PendingBlock(_)) => { + todo!() // doesn't make sense + } + Ok(AvailabilityProcessingStatus::PendingBlobs(blobs)) => { + BlockProcessResult::MissingBlobs(blobs) + } Err(e) => e.into(), } }