diff --git a/Cargo.lock b/Cargo.lock index 0e669154f5..e7d51d494d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5094,7 +5094,7 @@ dependencies = [ "task_executor", "tokio", "tokio-stream", - "tokio-util 0.6.10", + "tokio-util 0.7.7", "types", ] @@ -8021,6 +8021,7 @@ dependencies = [ "futures-io", "futures-sink", "pin-project-lite 0.2.9", + "slab", "tokio", "tracing", ] diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4115b2965b..e92c1ceb26 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -297,11 +297,6 @@ pub enum StateSkipConfig { WithoutStateRoots, } -pub enum BlockProcessingResult { - Verified(Hash256), - AvailabilityPending(ExecutedBlock), -} - pub trait BeaconChainTypes: Send + Sync + 'static { type HotStore: store::ItemStore; type ColdStore: store::ItemStore; @@ -2669,10 +2664,14 @@ impl BeaconChain { /// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and /// imported into the chain. /// + /// For post deneb blocks, this returns a `BlockError::AvailabilityPending` error + /// if the corresponding blobs are not in the required caches. + /// /// Items that implement `IntoExecutionPendingBlock` include: /// /// - `SignedBeaconBlock` /// - `GossipVerifiedBlock` + /// - `BlockWrapper` /// /// ## Errors /// @@ -2691,7 +2690,6 @@ impl BeaconChain { // Increment the Prometheus counter for block processing requests. metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS); - let slot = unverified_block.block().slot(); let chain = self.clone(); let execution_pending = unverified_block.into_execution_pending_block( @@ -2818,7 +2816,7 @@ impl BeaconChain { /// /// An error is returned if the block was unable to be imported. It may be partially imported /// (i.e., this function is not atomic). - async fn check_availability_and_maybe_import( + pub async fn check_availability_and_maybe_import( self: &Arc, cache_fn: impl FnOnce(Arc) -> Result, AvailabilityCheckError>, count_unrealized: CountUnrealized, diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 68c4d1cebc..7092576f0b 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -686,6 +686,12 @@ pub struct ExecutedBlock { pub payload_verification_outcome: PayloadVerificationOutcome, } +impl std::fmt::Debug for ExecutedBlock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.block) + } +} + /// Implemented on types that can be converted into a `ExecutionPendingBlock`. /// /// Used to allow functions to accept blocks at various stages of verification. diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 95d8a294c1..2b7f17f4b7 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -41,7 +41,7 @@ num_cpus = "1.13.0" lru_cache = { path = "../../common/lru_cache" } if-addrs = "0.6.4" strum = "0.24.0" -tokio-util = { version = "0.6.3", features = ["time"] } +tokio-util = { version = "0.7.7", features = ["time"] } derivative = "2.2.0" delay_map = "0.1.1" ethereum-types = { version = "0.14.1", optional = true } diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 410389b119..d725342d30 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -42,7 +42,9 @@ use crate::sync::manager::BlockProcessType; use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::parking_lot::Mutex; -use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock, NotifyExecutionLayer}; +use beacon_chain::{ + BeaconChain, BeaconChainTypes, ExecutedBlock, GossipVerifiedBlock, NotifyExecutionLayer, +}; use derivative::Derivative; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; @@ -83,6 +85,8 @@ mod worker; use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock; pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage}; +use self::work_reprocessing_queue::QueuedExecutedBlock; + /// The maximum size of the channel for work events to the `BeaconProcessor`. /// /// Setting this too low will cause consensus messages to be dropped. @@ -219,6 +223,7 @@ pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch"; pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch"; pub const GOSSIP_BLOCK: &str = "gossip_block"; +pub const EXECUTED_BLOCK: &str = "executed_block"; pub const GOSSIP_BLOCK_AND_BLOBS_SIDECAR: &str = "gossip_block_and_blobs_sidecar"; pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; @@ -729,7 +734,7 @@ impl WorkEvent { impl std::convert::From> for WorkEvent { fn from(ready_work: ReadyWork) -> Self { match ready_work { - ReadyWork::Block(QueuedGossipBlock { + ReadyWork::GossipBlock(QueuedGossipBlock { peer_id, block, seen_timestamp, @@ -741,6 +746,18 @@ impl std::convert::From> for WorkEvent { seen_timestamp, }, }, + ReadyWork::ExecutedBlock(QueuedExecutedBlock { + peer_id, + block, + seen_timestamp, + }) => Self { + drop_during_sync: false, + work: Work::ExecutedBlock { + peer_id, + block, + seen_timestamp, + }, + }, ReadyWork::RpcBlock(QueuedRpcBlock { block_root, block, @@ -872,6 +889,11 @@ pub enum Work { block: Box>, seen_timestamp: Duration, }, + ExecutedBlock { + peer_id: PeerId, + block: ExecutedBlock, + seen_timestamp: Duration, + }, GossipVoluntaryExit { message_id: MessageId, peer_id: PeerId, @@ -968,6 +990,7 @@ impl Work { Work::GossipAggregate { .. } => GOSSIP_AGGREGATE, Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH, Work::GossipBlock { .. } => GOSSIP_BLOCK, + Work::ExecutedBlock { .. } => EXECUTED_BLOCK, Work::GossipSignedBlobSidecar { .. } => GOSSIP_BLOCK_AND_BLOBS_SIDECAR, Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT, @@ -1127,6 +1150,7 @@ impl BeaconProcessor { FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN); // Using a FIFO queue since blocks need to be imported sequentially. + let mut executed_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); @@ -1243,6 +1267,9 @@ impl BeaconProcessor { // on the delayed ones. } else if let Some(item) = delayed_block_queue.pop() { self.spawn_worker(item, toolbox); + // Check availability pending blocks + } else if let Some(item) = executed_block_queue.pop() { + self.spawn_worker(item, toolbox); // Check gossip blocks before gossip attestations, since a block might be // required to verify some attestations. } else if let Some(item) = gossip_block_queue.pop() { @@ -1462,6 +1489,9 @@ impl BeaconProcessor { Work::GossipBlock { .. } => { gossip_block_queue.push(work, work_id, &self.log) } + Work::ExecutedBlock { .. } => { + gossip_block_queue.push(work, work_id, &self.log) + } Work::GossipSignedBlobSidecar { .. } => { gossip_block_and_blobs_sidecar_queue.push(work, work_id, &self.log) } @@ -1742,6 +1772,20 @@ impl BeaconProcessor { ) .await }), + Work::ExecutedBlock { + peer_id, + block, + seen_timestamp, + } => task_spawner.spawn_async(async move { + worker + .process_execution_verified_block( + peer_id, + block, + work_reprocessing_tx, + seen_timestamp, + ) + .await + }), /* * Verification for blobs sidecars received on gossip. */ diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 4d0bdc0027..82111fa6f1 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -14,7 +14,9 @@ use super::MAX_SCHEDULED_WORK_QUEUE_LEN; use crate::metrics; use crate::sync::manager::BlockProcessType; use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; -use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; +use beacon_chain::{ + BeaconChainTypes, ExecutedBlock, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY, +}; use fnv::FnvHashMap; use futures::task::Poll; use futures::{Stream, StreamExt}; @@ -53,11 +55,19 @@ pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); /// For how long to queue rpc blocks before sending them back for reprocessing. pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3); +/// For how long to queue executed blocks before sending them back for reprocessing. +pub const QUEUED_EXECUTED_BLOCK_DELAY: Duration = Duration::from_secs(12); + /// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that /// we signature-verify blocks before putting them in the queue *should* protect against this, but /// it's nice to have extra protection. const MAXIMUM_QUEUED_BLOCKS: usize = 16; +/// An `ExecutedBlock` contains the entire `BeaconState`, so we shouldn't be storing too many of them +/// to avoid getting DoS'd by the block proposer. +/// TODO(pawan): revise the max blocks +const MAXIMUM_QUEUED_EXECUTED_BLOCKS: usize = 4; + /// How many attestations we keep before new ones get dropped. const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; @@ -77,6 +87,9 @@ pub enum ReprocessQueueMessage { block_root: Hash256, parent_root: Hash256, }, + ExecutedBlock(QueuedExecutedBlock), + /// The blobs corresponding to a `block_root` are now fully available. + BlobsAvailable(Hash256), /// An unaggregated attestation that references an unknown block. UnknownBlockUnaggregate(QueuedUnaggregate), /// An aggregated attestation that references an unknown block. @@ -87,7 +100,8 @@ pub enum ReprocessQueueMessage { /// Events sent by the scheduler once they are ready for re-processing. pub enum ReadyWork { - Block(QueuedGossipBlock), + GossipBlock(QueuedGossipBlock), + ExecutedBlock(QueuedExecutedBlock), RpcBlock(QueuedRpcBlock), Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), @@ -131,6 +145,14 @@ pub struct QueuedGossipBlock { pub seen_timestamp: Duration, } +/// A block that has been fully verified and is pending data availability +/// and import into the beacon chain. +pub struct QueuedExecutedBlock { + pub peer_id: PeerId, + pub block: ExecutedBlock, + pub seen_timestamp: Duration, +} + /// A block that arrived for processing when the same block was being imported over gossip. /// It is queued for later import. pub struct QueuedRpcBlock { @@ -147,6 +169,9 @@ pub struct QueuedRpcBlock { enum InboundEvent { /// A gossip block that was queued for later processing and is ready for import. ReadyGossipBlock(QueuedGossipBlock), + /// An executed block that was queued for blob availability and is now + /// ready for import + ReadyExecutedBlock(QueuedExecutedBlock), /// A rpc block that was queued because the same gossip block was being imported /// will now be retried for import. ReadyRpcBlock(QueuedRpcBlock), @@ -170,6 +195,8 @@ struct ReprocessQueue { /* Queues */ /// Queue to manage scheduled early blocks. gossip_block_delay_queue: DelayQueue>, + /// Queue to manage availability pending blocks. + executed_block_delay_queue: DelayQueue>, /// Queue to manage scheduled early blocks. rpc_block_delay_queue: DelayQueue>, /// Queue to manage scheduled attestations. @@ -180,6 +207,8 @@ struct ReprocessQueue { /* Queued items */ /// Queued blocks. queued_gossip_block_roots: HashSet, + /// Queued availability pending blocks. + queued_executed_block_roots: HashMap, /// Queued aggregated attestations. queued_aggregates: FnvHashMap, DelayKey)>, /// Queued attestations. @@ -233,13 +262,21 @@ impl Stream for ReprocessQueue { // The sequential nature of blockchains means it is generally better to try and import all // existing blocks before new ones. match self.gossip_block_delay_queue.poll_expired(cx) { - Poll::Ready(Some(Ok(queued_block))) => { + Poll::Ready(Some(queued_block)) => { return Poll::Ready(Some(InboundEvent::ReadyGossipBlock( queued_block.into_inner(), ))); } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "gossip_block_queue"))); + // `Poll::Ready(None)` means that there are no more entries in the delay queue and we + // will continue to get this result until something else is added into the queue. + Poll::Ready(None) | Poll::Pending => (), + } + + match self.executed_block_delay_queue.poll_expired(cx) { + Poll::Ready(Some(queued_block)) => { + return Poll::Ready(Some(InboundEvent::ReadyExecutedBlock( + queued_block.into_inner(), + ))); } // `Poll::Ready(None)` means that there are no more entries in the delay queue and we // will continue to get this result until something else is added into the queue. @@ -247,40 +284,31 @@ impl Stream for ReprocessQueue { } match self.rpc_block_delay_queue.poll_expired(cx) { - Poll::Ready(Some(Ok(queued_block))) => { + Poll::Ready(Some(queued_block)) => { return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner()))); } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "rpc_block_queue"))); - } // `Poll::Ready(None)` means that there are no more entries in the delay queue and we // will continue to get this result until something else is added into the queue. Poll::Ready(None) | Poll::Pending => (), } match self.attestations_delay_queue.poll_expired(cx) { - Poll::Ready(Some(Ok(attestation_id))) => { + Poll::Ready(Some(attestation_id)) => { return Poll::Ready(Some(InboundEvent::ReadyAttestation( attestation_id.into_inner(), ))); } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "attestations_queue"))); - } // `Poll::Ready(None)` means that there are no more entries in the delay queue and we // will continue to get this result until something else is added into the queue. Poll::Ready(None) | Poll::Pending => (), } match self.lc_updates_delay_queue.poll_expired(cx) { - Poll::Ready(Some(Ok(lc_id))) => { + Poll::Ready(Some(lc_id)) => { return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate( lc_id.into_inner(), ))); } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue"))); - } // `Poll::Ready(None)` means that there are no more entries in the delay queue and we // will continue to get this result until something else is added into the queue. Poll::Ready(None) | Poll::Pending => (), @@ -313,10 +341,12 @@ pub fn spawn_reprocess_scheduler( work_reprocessing_rx, ready_work_tx, gossip_block_delay_queue: DelayQueue::new(), + executed_block_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), + queued_executed_block_roots: HashMap::new(), queued_lc_updates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), @@ -400,7 +430,7 @@ impl ReprocessQueue { if block_slot <= now && self .ready_work_tx - .try_send(ReadyWork::Block(early_block)) + .try_send(ReadyWork::GossipBlock(early_block)) .is_err() { error!( @@ -411,6 +441,59 @@ impl ReprocessQueue { } } } + InboundEvent::Msg(ExecutedBlock(executed_block)) => { + if self.executed_block_delay_queue.len() >= MAXIMUM_QUEUED_EXECUTED_BLOCKS { + // TODO(use your own debounce) + if self.rpc_block_debounce.elapsed() { + warn!( + log, + "Executed blocks queue is full"; + "queue_size" => MAXIMUM_QUEUED_EXECUTED_BLOCKS, + "msg" => "check system clock" + ); + } + // TODO(pawan): block would essentially get dropped here + // can the devs do something? + } + // Queue the block for a slot + let block_root = executed_block.block.block_root; + if !self.queued_executed_block_roots.contains_key(&block_root) { + let key = self + .executed_block_delay_queue + .insert(executed_block, QUEUED_EXECUTED_BLOCK_DELAY); + + self.queued_executed_block_roots.insert(block_root, key); + } + } + InboundEvent::Msg(BlobsAvailable(block_root)) => { + match self.queued_executed_block_roots.remove(&block_root) { + None => { + // Log an error to alert that we've made a bad assumption about how this + // program works, but still process the block anyway. + error!( + log, + "Unknown executed block in delay queue"; + "block_root" => ?block_root + ); + } + Some(key) => { + if let Some(executed_block) = + self.executed_block_delay_queue.try_remove(&key) + { + if self + .ready_work_tx + .try_send(ReadyWork::ExecutedBlock(executed_block.into_inner())) + .is_err() + { + error!( + log, + "Failed to pop queued block"; + ); + } + } + } + } + } // A rpc block arrived for processing at the same time when a gossip block // for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY` // and then send the rpc block back for processing assuming the gossip import @@ -664,6 +747,24 @@ impl ReprocessQueue { } } } + InboundEvent::ReadyExecutedBlock(executed_block) => { + let block_root = executed_block.block.block_root; + + if self + .queued_executed_block_roots + .remove(&block_root) + .is_none() + { + // Log an error to alert that we've made a bad assumption about how this + // program works, but still process the block anyway. + error!( + log, + "Unknown block in delay queue"; + "block_root" => ?block_root + ); + } + // TODO(pawan): just dropping the block, rethink what can be done here + } // A block that was queued for later processing is now ready to be processed. InboundEvent::ReadyGossipBlock(ready_block) => { let block_root = ready_block.block.block_root; @@ -680,7 +781,7 @@ impl ReprocessQueue { if self .ready_work_tx - .try_send(ReadyWork::Block(ready_block)) + .try_send(ReadyWork::GossipBlock(ready_block)) .is_err() { error!( @@ -689,6 +790,7 @@ impl ReprocessQueue { ); } } + InboundEvent::DelayQueueError(e, queue_name) => { crit!( log, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index a362efcf3f..91584ee303 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -2,6 +2,7 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::blob_verification::{AsBlock, BlockWrapper, GossipVerifiedBlob}; use beacon_chain::store::Error; +use beacon_chain::ExecutedBlock; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, light_client_finality_update_verification::Error as LightClientFinalityUpdateError, @@ -30,8 +31,8 @@ use types::{ use super::{ super::work_reprocessing_queue::{ - QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate, - ReprocessQueueMessage, + QueuedAggregate, QueuedExecutedBlock, QueuedGossipBlock, QueuedLightClientUpdate, + QueuedUnaggregate, ReprocessQueueMessage, }, Worker, }; @@ -987,6 +988,104 @@ impl Worker { } } + /// Process the beacon block that has already passed gossip verification. + /// + /// Raises a log if there are errors. + pub async fn process_execution_verified_block( + self, + peer_id: PeerId, + executed_block: ExecutedBlock, + reprocess_tx: mpsc::Sender>, + // This value is not used presently, but it might come in handy for debugging. + seen_duration: Duration, + ) { + let block_root = executed_block.block_root; + let block = executed_block.block.block_cloned(); + + match self + .chain + .check_availability_and_maybe_import( + |chain| { + chain + .data_availability_checker + .check_block_availability(executed_block) + }, + CountUnrealized::True, + ) + .await + { + Ok(AvailabilityProcessingStatus::Imported(block_root)) => { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); + + if reprocess_tx + .try_send(ReprocessQueueMessage::BlockImported { + block_root, + parent_root: block.message().parent_root(), + }) + .is_err() + { + error!( + self.log, + "Failed to inform block import"; + "source" => "gossip", + "block_root" => ?block_root, + ) + }; + + debug!( + self.log, + "Gossipsub block processed"; + "block" => ?block_root, + "peer_id" => %peer_id + ); + + self.chain.recompute_head_at_current_slot().await; + } + Ok(AvailabilityProcessingStatus::PendingBlobs(_)) + | Ok(AvailabilityProcessingStatus::PendingBlock(_)) + | Err(BlockError::AvailabilityCheck(_)) => { + // TODO(need to do something different if it's unavailble again) + unimplemented!() + } + Err(BlockError::ParentUnknown(block)) => { + // Inform the sync manager to find parents for this block + // This should not occur. It should be checked by `should_forward_block` + error!( + self.log, + "Block with unknown parent attempted to be processed"; + "peer_id" => %peer_id + ); + self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block, block_root)); + } + Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => { + debug!( + self.log, + "Failed to verify execution payload"; + "error" => %e + ); + } + other => { + debug!( + self.log, + "Invalid gossip beacon block"; + "outcome" => ?other, + "block root" => ?block_root, + "block slot" => block.slot() + ); + self.gossip_penalize_peer( + peer_id, + PeerAction::MidToleranceError, + "bad_gossip_block_ssz", + ); + trace!( + self.log, + "Invalid gossip beacon block ssz"; + "ssz" => format_args!("0x{}", hex::encode(block.as_ssz_bytes())), + ); + } + }; + } + /// Process the beacon block that has already passed gossip verification. /// /// Raises a log if there are errors. @@ -996,7 +1095,7 @@ impl Worker { verified_block: GossipVerifiedBlock, reprocess_tx: mpsc::Sender>, // This value is not used presently, but it might come in handy for debugging. - _seen_duration: Duration, + seen_duration: Duration, ) { let block = verified_block.block.block_cloned(); let block_root = verified_block.block_root; @@ -1044,6 +1143,32 @@ impl Worker { } Ok(AvailabilityProcessingStatus::PendingBlobs(blob_ids)) => { // make rpc request for blob + // let block_slot = block.block.slot(); + // // Make rpc request for blobs + // self.send_sync_message(SyncMessage::UnknownBlobHash { + // peer_id, + // block_root: block.block_root, + // }); + + // // Send block to reprocessing queue to await blobs + // if reprocess_tx + // .try_send(ReprocessQueueMessage::ExecutedBlock(QueuedExecutedBlock { + // peer_id, + // block, + // seen_timestamp: seen_duration, + // })) + // .is_err() + // { + // error!( + // self.log, + // "Failed to send partially verified block to reprocessing queue"; + // "block_slot" => %block_slot, + // "block_root" => ?block_root, + // "location" => "block gossip" + // ) + // } + } + Err(BlockError::AvailabilityCheck(_)) => { todo!() } Err(BlockError::ParentUnknown(block)) => { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index b9774ffa81..cc043c6269 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -119,6 +119,14 @@ pub enum SyncMessage { /// manager to attempt to find the block matching the unknown hash. UnknownBlockHash(PeerId, Hash256), + /// A peer has sent us a block that we haven't received all the blobs for. This triggers + /// the manager to attempt to find a blobs for the given block root. + /// TODO: add required blob indices as well. + UnknownBlobHash { + peer_id: PeerId, + block_root: Hash256, + }, + /// A peer has disconnected. Disconnect(PeerId), @@ -598,6 +606,9 @@ impl SyncManager { .search_block(block_hash, peer_id, &mut self.network); } } + SyncMessage::UnknownBlobHash { .. } => { + unimplemented!() + } SyncMessage::Disconnect(peer_id) => { self.peer_disconnect(&peer_id); }