diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d826895a25..67deb88f6f 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3394,6 +3394,7 @@ impl BeaconChain { { return Err(BlockError::ParentUnknown { parent_root: blob.block_parent_root(), + parent_block_hash: None, }); } } @@ -3520,7 +3521,10 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&parent_root) { - return Err(BlockError::ParentUnknown { parent_root }); + return Err(BlockError::ParentUnknown { + parent_root, + parent_block_hash: None, + }); } self.emit_sse_data_column_sidecar_events( diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 46efdcdf9f..5ec64b36a4 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -92,9 +92,10 @@ use std::fs; use std::io::Write; use std::sync::Arc; use store::{Error as DBError, KeyValueStore}; -use strum::AsRefStr; +use strum::{AsRefStr, IntoStaticStr}; use task_executor::JoinHandle; use tracing::{Instrument, Span, debug, debug_span, error, info_span, instrument}; +use types::ExecutionBlockHash; use types::{ BeaconBlockRef, BeaconState, BeaconStateError, BlobsList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, FullPayload, Hash256, InconsistentFork, KzgProofs, RelativeEpoch, @@ -114,7 +115,7 @@ const WRITE_BLOCK_PROCESSING_SSZ: bool = cfg!(feature = "write_ssz_files"); /// /// - The block is malformed/invalid (indicated by all results other than `BeaconChainError`. /// - We encountered an error whilst trying to verify the block (a `BeaconChainError`). -#[derive(Debug, AsRefStr)] +#[derive(Debug, AsRefStr, IntoStaticStr)] pub enum BlockError { /// The parent block was unknown. /// @@ -122,7 +123,10 @@ pub enum BlockError { /// /// It's unclear if this block is valid, but it cannot be processed without already knowing /// its parent. - ParentUnknown { parent_root: Hash256 }, + ParentUnknown { + parent_root: Hash256, + parent_block_hash: Option, + }, /// The block slot is greater than the present slot. /// /// ## Peer scoring @@ -336,7 +340,7 @@ impl From for BlockError { /// Returned when block validation failed due to some issue verifying /// the execution payload. -#[derive(Debug)] +#[derive(Debug, IntoStaticStr)] pub enum ExecutionPayloadError { /// There's no eth1 connection (mandatory after merge) /// @@ -1404,6 +1408,7 @@ impl ExecutionPendingBlock { // genesis). return Err(BlockError::ParentUnknown { parent_root: block.parent_root(), + parent_block_hash: block.as_block().parent_block_hash(), }); } } @@ -1779,6 +1784,7 @@ pub fn check_block_is_finalized_checkpoint_or_descendant< } else { Err(BlockError::ParentUnknown { parent_root: block.parent_root(), + parent_block_hash: block.as_block().parent_block_hash(), }) } } @@ -1882,6 +1888,7 @@ fn verify_parent_block_is_known( | ParentImportedStatus::UnknownBlock | ParentImportedStatus::UnimportedPayload => Err(BlockError::ParentUnknown { parent_root: block.parent_root(), + parent_block_hash: block.parent_block_hash(), }), } } @@ -1913,6 +1920,7 @@ fn load_parent>( { return Err(BlockError::ParentUnknown { parent_root: block.parent_root(), + parent_block_hash: block.as_block().parent_block_hash(), }); } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index ab69a62985..2653c84860 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -1,7 +1,8 @@ use kzg::{Error as KzgError, KzgCommitment}; +use strum::IntoStaticStr; use types::{BeaconStateError, ColumnIndex, Hash256}; -#[derive(Debug)] +#[derive(Debug, IntoStaticStr)] pub enum Error { InvalidBlobs(KzgError), MissingBid(Hash256), diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index e0c39c350b..a85510dd10 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1305,7 +1305,7 @@ async fn block_gossip_verification() { assert!( matches!( unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), - BlockError::ParentUnknown {parent_root: p} + BlockError::ParentUnknown {parent_root: p, ..} if p == parent_root ), "should not import a block for an unknown parent" diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 3b851fcba7..561fa16f03 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -41,8 +41,8 @@ pub use crate::scheduler::BeaconProcessorQueueLengths; use crate::scheduler::work_queue::WorkQueues; use crate::work_reprocessing_queue::{ - QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope, - ReprocessQueueMessage, + QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipDataColumn, + QueuedGossipEnvelope, ReprocessQueueMessage, }; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; @@ -304,6 +304,10 @@ impl From for WorkEvent { work: Work::ColumnReconstruction(process_fn), } } + ReadyWork::DataColumn(QueuedGossipDataColumn { process_fn, .. }) => Self { + drop_during_sync: true, + work: Work::UnknownBlockDataColumn { process_fn }, + }, } } } @@ -369,6 +373,9 @@ pub enum Work { UnknownBlockAttestation { process_fn: BlockingFn, }, + UnknownBlockDataColumn { + process_fn: BlockingFn, + }, GossipAttestationBatch { attestations: GossipAttestationBatch, process_batch: Box, @@ -466,6 +473,7 @@ pub enum WorkType { GossipAttestation, GossipAttestationToConvert, UnknownBlockAttestation, + UnknownBlockDataColumn, GossipAttestationBatch, GossipAggregate, UnknownBlockAggregate, @@ -571,6 +579,7 @@ impl Work { Work::LightClientFinalityUpdateRequest(_) => WorkType::LightClientFinalityUpdateRequest, Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest, Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation, + Work::UnknownBlockDataColumn { .. } => WorkType::UnknownBlockDataColumn, Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate, Work::UnknownLightClientOptimisticUpdate { .. } => { WorkType::UnknownLightClientOptimisticUpdate @@ -844,6 +853,9 @@ impl BeaconProcessor { Some(item) } else if let Some(item) = work_queues.gossip_data_column_queue.pop() { Some(item) + } else if let Some(item) = work_queues.unknown_block_data_column_queue.pop() + { + Some(item) } else if let Some(item) = work_queues.gossip_partial_data_column_queue.pop() { @@ -1240,6 +1252,9 @@ impl BeaconProcessor { Work::UnknownBlockAttestation { .. } => { work_queues.unknown_block_attestation_queue.push(work) } + Work::UnknownBlockDataColumn { .. } => work_queues + .unknown_block_data_column_queue + .push(work, work_id), Work::UnknownBlockAggregate { .. } => { work_queues.unknown_block_aggregate_queue.push(work) } @@ -1290,6 +1305,9 @@ impl BeaconProcessor { WorkType::UnknownBlockAttestation => { work_queues.unknown_block_attestation_queue.len() } + WorkType::UnknownBlockDataColumn => { + work_queues.unknown_block_data_column_queue.len() + } WorkType::GossipAttestationBatch => 0, // No queue WorkType::GossipAggregate => work_queues.aggregate_queue.len(), WorkType::UnknownBlockAggregate => { @@ -1506,6 +1524,7 @@ impl BeaconProcessor { }), Work::UnknownBlockAttestation { process_fn } | Work::UnknownBlockAggregate { process_fn } + | Work::UnknownBlockDataColumn { process_fn } | Work::UnknownLightClientOptimisticUpdate { process_fn, .. } => { task_spawner.spawn_blocking(process_fn) } diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index ebd66e743d..cc03feac51 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -111,6 +111,7 @@ pub struct BeaconProcessorQueueLengths { attestation_queue: usize, unknown_block_aggregate_queue: usize, unknown_block_attestation_queue: usize, + unknown_block_data_column_queue: usize, sync_message_queue: usize, sync_contribution_queue: usize, gossip_voluntary_exit_queue: usize, @@ -174,6 +175,8 @@ impl BeaconProcessorQueueLengths { Ok(Self { aggregate_queue: 4096, unknown_block_aggregate_queue: 1024, + // Capacity for two slot's worth of data columns for a supernode. + unknown_block_data_column_queue: 256, // Capacity for a full slot's worth of attestations if subscribed to all subnets attestation_queue: std::cmp::max( active_validator_count / slots_per_epoch, @@ -245,6 +248,7 @@ pub struct WorkQueues { pub attestation_debounce: TimeLatch, pub unknown_block_aggregate_queue: LifoQueue>, pub unknown_block_attestation_queue: LifoQueue>, + pub unknown_block_data_column_queue: FifoQueue>, pub sync_message_queue: LifoQueue>, pub sync_contribution_queue: LifoQueue>, pub gossip_voluntary_exit_queue: FifoQueue>, @@ -302,6 +306,8 @@ impl WorkQueues { LifoQueue::new(queue_lengths.unknown_block_aggregate_queue); let unknown_block_attestation_queue = LifoQueue::new(queue_lengths.unknown_block_attestation_queue); + let unknown_block_data_column_queue = + FifoQueue::new(queue_lengths.unknown_block_data_column_queue); let sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue); let sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue); @@ -383,6 +389,7 @@ impl WorkQueues { attestation_debounce, unknown_block_aggregate_queue, unknown_block_attestation_queue, + unknown_block_data_column_queue, sync_message_queue, sync_contribution_queue, gossip_voluntary_exit_queue, diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index b1fa56af01..62ed86fbad 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -52,6 +52,10 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); /// For how long to queue light client updates for re-processing. pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); +/// Data column timeout as a multiplier of slot duration. Columns waiting for their block will be +/// sent for processing after this many slots worth of time, even if the block hasn't arrived. +const QUEUED_DATA_COLUMN_DELAY_SLOTS: u32 = 1; + /// Envelope timeout as a multiplier of slot duration. Envelopes waiting for their block will be /// sent for processing after this many slots worth of time, even if the block hasn't arrived. const QUEUED_ENVELOPE_DELAY_SLOTS: u32 = 1; @@ -76,6 +80,9 @@ const MAXIMUM_QUEUED_ENVELOPES: usize = 16; /// How many attestations we keep before new ones get dropped. const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; +/// How many columns we keep before new ones get dropped. +const MAXIMUM_QUEUED_DATA_COLUMNS: usize = 256; + /// How many light client updates we keep before new ones get dropped. const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128; @@ -123,6 +130,8 @@ pub enum ReprocessQueueMessage { UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate), /// A new backfill batch that needs to be scheduled for processing. BackfillSync(QueuedBackfillBatch), + /// A gossip data column that references an unknown block. + UnknownBlockDataColumn(QueuedGossipDataColumn), /// A delayed column reconstruction that needs checking DelayColumnReconstruction(QueuedColumnReconstruction), } @@ -138,6 +147,7 @@ pub enum ReadyWork { LightClientUpdate(QueuedLightClientUpdate), BackfillSync(QueuedBackfillBatch), ColumnReconstruction(QueuedColumnReconstruction), + DataColumn(QueuedGossipDataColumn), } /// An Attestation for which the corresponding block was not seen while processing, queued for @@ -200,6 +210,12 @@ pub struct QueuedColumnReconstruction { pub process_fn: AsyncFn, } +/// A gossip data column that references an unknown block, queued for later reprocessing. +pub struct QueuedGossipDataColumn { + pub beacon_block_root: Hash256, + pub process_fn: BlockingFn, +} + impl TryFrom> for QueuedBackfillBatch { type Error = WorkEvent; @@ -240,6 +256,8 @@ enum InboundEvent { ReadyBackfillSync(QueuedBackfillBatch), /// A column reconstruction that was queued is ready for processing. ReadyColumnReconstruction(QueuedColumnReconstruction), + /// A gossip data column that is ready for re-processing. + ReadyDataColumn(Hash256), /// A message sent to the `ReprocessQueue` Msg(ReprocessQueueMessage), } @@ -264,6 +282,8 @@ struct ReprocessQueue { lc_updates_delay_queue: DelayQueue, /// Queue to manage scheduled column reconstructions. column_reconstructions_delay_queue: DelayQueue, + /// Queue to manage gossip data column timeouts. + data_columns_delay_queue: DelayQueue, /* Queued items */ /// Queued blocks. @@ -284,6 +304,10 @@ struct ReprocessQueue { queued_column_reconstructions: HashMap>, /// Queued backfill batches queued_backfill_batches: Vec, + /// Queued gossip data columns awaiting their block, keyed by block root. + awaiting_data_columns_per_root: HashMap, DelayKey)>, + /// Total number of queued gossip data columns across all roots. + queued_data_columns_count: usize, /* Aux */ /// Next attestation id, used for both aggregated and unaggregated attestations @@ -294,6 +318,7 @@ struct ReprocessQueue { rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, lc_update_delay_debounce: TimeLatch, + data_column_delay_debounce: TimeLatch, next_backfill_batch_event: Option>>, slot_clock: Arc, } @@ -387,6 +412,13 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.data_columns_delay_queue.poll_expired(cx) { + Poll::Ready(Some(block_root)) => { + return Poll::Ready(Some(InboundEvent::ReadyDataColumn(block_root.into_inner()))); + } + Poll::Ready(None) | Poll::Pending => (), + } + if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() { match next_backfill_batch_event.as_mut().poll(cx) { Poll::Ready(_) => { @@ -455,6 +487,7 @@ impl ReprocessQueue { attestations_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(), column_reconstructions_delay_queue: DelayQueue::new(), + data_columns_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), awaiting_envelopes_per_root: HashMap::new(), queued_lc_updates: FnvHashMap::default(), @@ -464,6 +497,8 @@ impl ReprocessQueue { awaiting_lc_updates_per_parent_root: HashMap::new(), queued_backfill_batches: Vec::new(), queued_column_reconstructions: HashMap::new(), + awaiting_data_columns_per_root: HashMap::new(), + queued_data_columns_count: 0, next_attestation: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), @@ -471,6 +506,7 @@ impl ReprocessQueue { rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), lc_update_delay_debounce: TimeLatch::default(), + data_column_delay_debounce: TimeLatch::default(), next_backfill_batch_event: None, slot_clock, } @@ -551,22 +587,16 @@ impl ReprocessQueue { return; } - // When the queue is full, evict the oldest entry to make room for newer envelopes. + // When the queue is full, drop the new envelope. if self.awaiting_envelopes_per_root.len() >= MAXIMUM_QUEUED_ENVELOPES { if self.envelope_delay_debounce.elapsed() { warn!( queue_size = MAXIMUM_QUEUED_ENVELOPES, msg = "system resources may be saturated", - "Envelope delay queue is full, evicting oldest entry" + "Envelope delay queue is full, dropping envelope" ); } - if let Some(oldest_root) = - self.awaiting_envelopes_per_root.keys().next().copied() - && let Some((_envelope, delay_key)) = - self.awaiting_envelopes_per_root.remove(&oldest_root) - { - self.envelope_delay_queue.remove(&delay_key); - } + return; } // Register the timeout. @@ -688,6 +718,37 @@ impl ReprocessQueue { self.next_attestation += 1; } + InboundEvent::Msg(UnknownBlockDataColumn(queued_data_column)) => { + let block_root = queued_data_column.beacon_block_root; + + if self.queued_data_columns_count >= MAXIMUM_QUEUED_DATA_COLUMNS { + if self.data_column_delay_debounce.elapsed() { + warn!( + queue_size = MAXIMUM_QUEUED_DATA_COLUMNS, + msg = "system resources may be saturated", + "Data column delay queue is full, dropping column" + ); + } + return; + } + + if let Some((columns, _delay_key)) = + self.awaiting_data_columns_per_root.get_mut(&block_root) + { + // Append to existing entry; the timer for this root is already running. + columns.push(queued_data_column); + } else { + let delay_key = self.data_columns_delay_queue.insert( + block_root, + self.slot_clock.slot_duration() * QUEUED_DATA_COLUMN_DELAY_SLOTS, + ); + + self.awaiting_data_columns_per_root + .insert(block_root, (vec![queued_data_column], delay_key)); + } + + self.queued_data_columns_count += 1; + } InboundEvent::Msg(UnknownLightClientOptimisticUpdate( queued_light_client_optimistic_update, )) => { @@ -800,6 +861,25 @@ impl ReprocessQueue { ); } } + + // Unqueue the data columns we have for this root, if any. + if let Some((data_columns, delay_key)) = + self.awaiting_data_columns_per_root.remove(&block_root) + { + self.data_columns_delay_queue.remove(&delay_key); + self.queued_data_columns_count = self + .queued_data_columns_count + .saturating_sub(data_columns.len()); + for data_column in data_columns { + if self + .ready_work_tx + .try_send(ReadyWork::DataColumn(data_column)) + .is_err() + { + error!(?block_root, "Failed to send data column for reprocessing"); + } + } + } } InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => { // Unqueue the light client optimistic updates we have for this root, if any. @@ -1053,6 +1133,27 @@ impl ReprocessQueue { ); } } + InboundEvent::ReadyDataColumn(block_root) => { + if let Some((data_columns, _)) = + self.awaiting_data_columns_per_root.remove(&block_root) + { + self.queued_data_columns_count = self + .queued_data_columns_count + .saturating_sub(data_columns.len()); + for data_column in data_columns { + if self + .ready_work_tx + .try_send(ReadyWork::DataColumn(data_column)) + .is_err() + { + error!( + hint = "system may be overloaded", + "Ignored expired gossip data column" + ); + } + } + } + } } metrics::set_gauge_vec( @@ -1581,48 +1682,87 @@ mod tests { assert_eq!(queue.envelope_delay_queue.len(), 1); } + /// Tests that a queued gossip data column is released when its block is imported. #[tokio::test] - async fn envelope_capacity_evicts_oldest() { + async fn data_column_released_on_block_imported() { + create_test_tracing_subscriber(); + + let config = BeaconProcessorConfig::default(); + let (ready_work_tx, mut ready_work_rx) = + mpsc::channel::(config.max_scheduled_work_queue_len); + let (_, reprocess_work_rx) = + mpsc::channel::(config.max_scheduled_work_queue_len); + let slot_clock = Arc::new(testing_slot_clock(12)); + let mut queue = ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock); + + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xbb); + + let msg = ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn { + beacon_block_root, + process_fn: Box::new(|| {}), + }); + queue.handle_message(InboundEvent::Msg(msg)); + + assert_eq!(queue.awaiting_data_columns_per_root.len(), 1); + assert!( + queue + .awaiting_data_columns_per_root + .contains_key(&beacon_block_root) + ); + assert_eq!(queue.data_columns_delay_queue.len(), 1); + + // Simulate block import. + queue.handle_message(InboundEvent::Msg(ReprocessQueueMessage::BlockImported { + block_root: beacon_block_root, + parent_root: Hash256::repeat_byte(0x00), + })); + + // Internal state should be cleaned up. + assert!(queue.awaiting_data_columns_per_root.is_empty()); + assert_eq!(queue.data_columns_delay_queue.len(), 0); + + // The column should have been sent to the ready_work channel. + let ready = ready_work_rx.try_recv().expect("column should be ready"); + assert!(matches!(ready, ReadyWork::DataColumn(_))); + } + + /// Tests that an expired gossip data column is pruned cleanly from all internal state. + #[tokio::test] + async fn prune_awaiting_data_columns_per_root() { create_test_tracing_subscriber(); let mut queue = test_queue(); - // Pause time so it only advances manually tokio::time::pause(); - // Fill the queue to capacity. - for i in 0..MAXIMUM_QUEUED_ENVELOPES { - let block_root = Hash256::repeat_byte(i as u8); - let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { - beacon_block_slot: Slot::new(1), - beacon_block_root: block_root, - process_fn: Box::pin(async {}), - }); - queue.handle_message(InboundEvent::Msg(msg)); - } - assert_eq!( - queue.awaiting_envelopes_per_root.len(), - MAXIMUM_QUEUED_ENVELOPES - ); + let beacon_block_root = Hash256::repeat_byte(0xcd); - // One more should evict the oldest and insert the new one. - let overflow_root = Hash256::repeat_byte(0xff); - let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { - beacon_block_slot: Slot::new(1), - beacon_block_root: overflow_root, - process_fn: Box::pin(async {}), + let msg = ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn { + beacon_block_root, + process_fn: Box::new(|| {}), }); queue.handle_message(InboundEvent::Msg(msg)); - // Queue should still be at capacity, with the new root present. - assert_eq!( - queue.awaiting_envelopes_per_root.len(), - MAXIMUM_QUEUED_ENVELOPES - ); + assert_eq!(queue.awaiting_data_columns_per_root.len(), 1); assert!( queue - .awaiting_envelopes_per_root - .contains_key(&overflow_root) + .awaiting_data_columns_per_root + .contains_key(&beacon_block_root) ); + + // Advance time past the delay so the entry expires. + advance_time( + &queue.slot_clock, + 2 * queue.slot_clock.slot_duration() * QUEUED_DATA_COLUMN_DELAY_SLOTS, + ) + .await; + let ready_msg = queue.next().await.unwrap(); + assert!(matches!(ready_msg, InboundEvent::ReadyDataColumn(_))); + queue.handle_message(ready_msg); + + // All internal state should be cleaned up. + assert!(queue.awaiting_data_columns_per_root.is_empty()); } } diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index db42d0cfa8..4d4d91a456 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -125,6 +125,9 @@ pub struct Config { /// Whether light client protocols should be enabled. pub enable_light_client_server: bool, + /// Whether to enable the deprecated mplex multiplexer alongside yamux. + pub enable_mplex: bool, + /// Configuration for the outbound rate limiter (requests made by this node). pub outbound_rate_limiter_config: Option, @@ -362,6 +365,7 @@ impl Default for Config { proposer_only: false, metrics_enabled: false, enable_light_client_server: true, + enable_mplex: false, outbound_rate_limiter_config: None, invalid_block_storage: None, inbound_rate_limiter_config: None, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 41d937e324..f5e2442f86 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -466,9 +466,13 @@ impl Network { } }; - // Set up the transport - tcp/quic with noise and mplex - let transport = build_transport(local_keypair.clone(), !config.disable_quic_support) - .map_err(|e| format!("Failed to build transport: {:?}", e))?; + // Set up the transport - tcp/quic with noise and yamux (mplex optional) + let transport = build_transport( + local_keypair.clone(), + !config.disable_quic_support, + config.enable_mplex, + ) + .map_err(|e| format!("Failed to build transport: {:?}", e))?; // use the executor for libp2p struct Executor(task_executor::TaskExecutor); diff --git a/beacon_node/lighthouse_network/src/service/utils.rs b/beacon_node/lighthouse_network/src/service/utils.rs index c7dabcb391..47629f4fd3 100644 --- a/beacon_node/lighthouse_network/src/service/utils.rs +++ b/beacon_node/lighthouse_network/src/service/utils.rs @@ -34,27 +34,39 @@ pub struct Context<'a> { type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>; /// The implementation supports TCP/IP, QUIC (experimental) over UDP, noise as the encryption layer, and -/// mplex/yamux as the multiplexing layer (when using TCP). +/// yamux as the multiplexing layer (when using TCP). Mplex can be optionally enabled. pub fn build_transport( local_private_key: Keypair, quic_support: bool, + enable_mplex: bool, ) -> std::io::Result { - // mplex config - let mut mplex_config = libp2p_mplex::Config::new(); - mplex_config.set_max_buffer_size(256); - mplex_config.set_max_buffer_behaviour(libp2p_mplex::MaxBufferBehaviour::Block); - // yamux config let yamux_config = yamux::Config::default(); + // Creates the TCP transport layer - let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)) - .upgrade(core::upgrade::Version::V1) - .authenticate(generate_noise_config(&local_private_key)) - .multiplex(core::upgrade::SelectUpgrade::new( - yamux_config, - mplex_config, - )) - .timeout(Duration::from_secs(10)); + let tcp: BoxedTransport = if enable_mplex { + // Enable both yamux and mplex. + let mut mplex_config = libp2p_mplex::Config::new(); + mplex_config.set_max_num_streams(32); + mplex_config.set_max_buffer_behaviour(libp2p_mplex::MaxBufferBehaviour::ResetStream); + libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)) + .upgrade(core::upgrade::Version::V1) + .authenticate(generate_noise_config(&local_private_key)) + .multiplex(core::upgrade::SelectUpgrade::new( + yamux_config, + mplex_config, + )) + .timeout(Duration::from_secs(10)) + .boxed() + } else { + // Yamux only + libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)) + .upgrade(core::upgrade::Version::V1) + .authenticate(generate_noise_config(&local_private_key)) + .multiplex(yamux_config) + .timeout(Duration::from_secs(10)) + .boxed() + }; let transport = if quic_support { // Enables Quic // The default quic configuration suits us for now. diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index df94b473a8..9becfd4d59 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -61,8 +61,8 @@ use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction; use beacon_processor::{ DuplicateCache, GossipAggregatePackage, GossipAttestationBatch, work_reprocessing_queue::{ - QueuedAggregate, QueuedGossipBlock, QueuedGossipEnvelope, QueuedLightClientUpdate, - QueuedUnaggregate, ReprocessQueueMessage, + QueuedAggregate, QueuedGossipBlock, QueuedGossipDataColumn, QueuedGossipEnvelope, + QueuedLightClientUpdate, QueuedUnaggregate, ReprocessQueueMessage, }, }; @@ -657,6 +657,7 @@ impl NetworkBeaconProcessor { subnet_id: DataColumnSubnetId, column_sidecar: Arc>, seen_duration: Duration, + allow_reprocess: bool, ) { let slot = column_sidecar.slot(); let block_root = column_sidecar.block_root(); @@ -738,19 +739,48 @@ impl NetworkBeaconProcessor { .. } => { debug!( - action = "ignoring", + action = "queuing for reprocessing", %unknown_block_root, "Unknown block root for column" ); - // TODO(gloas): wire this into proper lookup sync. Sending - // `UnknownBlockHashFromAttestation` here is a Fulu-shaped fallback that - // mixes column processing with the attestation lookup path and is not - // the right primitive for Gloas column lookups. self.propagate_validation_result( - message_id, + message_id.clone(), peer_id, MessageAcceptance::Ignore, ); + + if allow_reprocess { + // Queue the column for reprocessing when the block arrives. + let processor = self.clone(); + let reprocess_msg = ReprocessQueueMessage::UnknownBlockDataColumn( + QueuedGossipDataColumn { + beacon_block_root: unknown_block_root, + process_fn: Box::new(move || { + let _ = processor.send_gossip_data_column_sidecar( + message_id, + peer_id, + subnet_id, + column_sidecar, + seen_duration, + false, // Do not reprocess this message again. + ); + }), + }, + ); + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(reprocess_msg), + }) + .is_err() + { + debug!( + %unknown_block_root, + "Failed to queue data column for reprocessing" + ); + } + } } GossipDataColumnError::InvalidVariant | GossipDataColumnError::PubkeyCacheTimeout diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 9f2acd73dc..f3c773eb25 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -36,13 +36,13 @@ use { slot_clock::ManualSlotClock, store::MemoryStore, tokio::sync::mpsc::UnboundedSender, }; -pub use sync_methods::ChainSegmentProcessId; +pub use sync_methods::{BlockProcessingResult, ChainSegmentProcessId}; pub type Error = TrySendError>; mod gossip_methods; mod rpc_methods; -mod sync_methods; +pub(crate) mod sync_methods; mod tests; pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; @@ -201,6 +201,7 @@ impl NetworkBeaconProcessor { subnet_id: DataColumnSubnetId, column_sidecar: Arc>, seen_timestamp: Duration, + allow_reprocess: bool, ) -> Result<(), Error> { let processor = self.clone(); let process_fn = async move { @@ -211,6 +212,7 @@ impl NetworkBeaconProcessor { subnet_id, column_sidecar, seen_timestamp, + allow_reprocess, ) .await }; diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index d8de50a5c9..289a893176 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -3,8 +3,8 @@ use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProces use crate::sync::BatchProcessResult; use crate::sync::manager::CustodyBatchProcessResult; use crate::sync::{ - ChainId, - manager::{BlockProcessType, BlockProcessingResult, SyncMessage, WhichPeerToPenalize}, + ChainId, PeerGroup, SyncNetworkContext, + manager::{BlockProcessType, SyncMessage}, }; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; @@ -22,12 +22,13 @@ use beacon_processor::{ }; use beacon_processor::{Work, WorkEvent}; use lighthouse_network::PeerAction; +use lighthouse_network::PeerId; use lighthouse_network::service::api_types::CustodyBackfillBatchId; use logging::crit; use std::sync::Arc; use std::time::Duration; use tracing::{debug, debug_span, error, info, instrument, warn}; -use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256}; +use types::{BlockImportSource, DataColumnSidecarList, Epoch, ExecutionBlockHash, Hash256}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -98,7 +99,7 @@ impl NetworkBeaconProcessor { process_type, result: BlockProcessingResult::Error { penalty: None, - reason: "processor_overloaded", + reason: "ignored_processor_overloaded".to_string(), }, }); }; @@ -238,10 +239,9 @@ impl NetworkBeaconProcessor { } // Sync handles these results - let result = classify_processing_result(result, &process_type); self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result, + result: result.into(), }); // Drop the handle to remove the entry from the cache @@ -319,10 +319,9 @@ impl NetworkBeaconProcessor { Err(_) => {} } - let result = classify_processing_result(result, &process_type); self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result, + result: result.into(), }); } @@ -334,7 +333,6 @@ impl NetworkBeaconProcessor { skip_all, fields(?block_root), )] - #[allow(clippy::result_large_err)] pub async fn process_lookup_envelope( self: Arc>, block_root: Hash256, @@ -343,13 +341,15 @@ impl NetworkBeaconProcessor { process_type: BlockProcessType, ) { debug!( + ?block_root, slot = %envelope.slot(), ?process_type, "Processing RPC payload envelope" ); - // Gossip verification covers signature / slot / builder-index / block-hash checks + // Gossip verification runs the same signature / slot / builder-index / block-hash checks // independently of gossip propagation, so we can reuse it for RPC-fetched envelopes. + #[allow(clippy::result_large_err)] let result = match self .chain .clone() @@ -370,10 +370,15 @@ impl NetworkBeaconProcessor { Err(e) => Err(e), }; - let result = classify_envelope_result(result, &process_type); + // TODO(gloas): structured penalty classification arrives with the envelope lookup state + // machine; for now, fold the EnvelopeError into BlockError::InternalError so it flows + // through the existing `BlockProcessingResult::Err` path. + let result: Result = + result.map_err(|e| BlockError::InternalError(format!("envelope: {e}"))); + self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result, + result: result.into(), }); } @@ -955,128 +960,134 @@ impl NetworkBeaconProcessor { } } -/// Translate the beacon-chain processing outcome into a `BlockProcessingResult` the lookup state -/// machine can act on directly. The policy decisions about *whether* and *which peer-class* to -/// penalize live here, on the producer side, so consumers only need to resolve the symbolic -/// `WhichPeerToPenalize` to an actual peer id at penalty time. -fn classify_processing_result( - result: Result, - process_type: &BlockProcessType, -) -> BlockProcessingResult { - let e = match result { - Ok(AvailabilityProcessingStatus::Imported(_)) => { - return BlockProcessingResult::Imported("imported"); - } - Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { - return BlockProcessingResult::Imported("missing_components"); - } - Err(BlockError::DuplicateFullyImported(_)) => { - return BlockProcessingResult::Imported("duplicate"); - } - Err(BlockError::GenesisBlock) => { - return BlockProcessingResult::Imported("genesis"); - } - Err(e) => e, - }; +/// The classified outcome of submitting a block / blob / column for processing, ready for the +/// lookup state machine to act on without re-inspecting `BlockError`. +#[derive(Debug)] +pub enum BlockProcessingResult { + /// `fully_imported` is true if the lookup is complete; false if `MissingComponents` (the + /// lookup must keep fetching). `info` is a stable label for logs / metrics. + Imported(bool, &'static str), + ParentUnknown { + parent_root: Hash256, + parent_block_hash: Option, + }, + /// Processing failed. `penalty` is `Some` when an attributable peer should be downscored; + /// the third tuple element is the `report_peer` telemetry msg. `reason` is for logs only. + Error { + penalty: Option<(PeerAction, WhichPeerToPenalize, &'static str)>, + reason: String, + }, +} - // Non-attributable failures. - let no_penalty = |reason| BlockProcessingResult::Error { - penalty: None, - reason, - }; - match &e { - BlockError::BeaconChainError(_) => return no_penalty("beacon_chain_error"), - BlockError::DuplicateImportStatusUnknown(_) => { - return no_penalty("duplicate_unknown_status"); +impl From> for BlockProcessingResult { + fn from(result: Result) -> Self { + fn block_peer_penalty>( + err: E, + ) -> Option<(PeerAction, WhichPeerToPenalize, &'static str)> { + Some(( + PeerAction::MidToleranceError, + WhichPeerToPenalize::BlockPeer, + err.into(), + )) } - BlockError::AvailabilityCheck(inner) - if inner.category() == AvailabilityCheckErrorCategory::Internal => - { - return no_penalty("availability_internal"); + match result { + Ok(AvailabilityProcessingStatus::Imported(_)) => Self::Imported(true, "imported"), + Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { + Self::Imported(false, "missing_components") + } + Err(e) => { + let penalty = match &e { + BlockError::DuplicateFullyImported(_) => { + return Self::Imported(true, "duplicate"); + } + BlockError::GenesisBlock => return Self::Imported(true, "genesis"), + BlockError::ParentUnknown { + parent_root, + parent_block_hash, + } => { + return Self::ParentUnknown { + parent_root: *parent_root, + parent_block_hash: *parent_block_hash, + }; + } + BlockError::BeaconChainError(_) | BlockError::InternalError(_) => None, + BlockError::DuplicateImportStatusUnknown(_) => None, + BlockError::AvailabilityCheck(inner) => match inner { + AvailabilityCheckError::InvalidColumn((Some(idx), _)) => Some(( + PeerAction::MidToleranceError, + WhichPeerToPenalize::CustodyPeerForColumn(*idx), + (&e).into(), + )), + inner => match inner.category() { + AvailabilityCheckErrorCategory::Internal => None, + AvailabilityCheckErrorCategory::Malicious => block_peer_penalty(inner), + }, + }, + BlockError::ExecutionPayloadError(epe) => { + if epe.penalize_peer() { + block_peer_penalty(epe) + } else { + None + } + } + // Remaining invalid blocks: penalize the block peer. Listed explicitly so a + // new `BlockError` variant forces a compile error here. + BlockError::FutureSlot { .. } + | BlockError::StateRootMismatch { .. } + | BlockError::WouldRevertFinalizedSlot { .. } + | BlockError::NotFinalizedDescendant { .. } + | BlockError::BlockSlotLimitReached + | BlockError::IncorrectBlockProposer { .. } + | BlockError::UnknownValidator(_) + | BlockError::InvalidSignature(_) + | BlockError::BlockIsNotLaterThanParent { .. } + | BlockError::NonLinearParentRoots + | BlockError::NonLinearSlots + | BlockError::PerBlockProcessingError(_) + | BlockError::WeakSubjectivityConflict + | BlockError::InconsistentFork(_) + | BlockError::ParentExecutionPayloadInvalid { .. } + | BlockError::KnownInvalidExecutionPayload(_) + | BlockError::Slashable + | BlockError::EnvelopeBlockRootUnknown(_) + | BlockError::OptimisticSyncNotSupported { .. } + | BlockError::InvalidBlobCount { .. } + | BlockError::BidParentRootMismatch { .. } => block_peer_penalty(&e), + }; + Self::Error { + penalty, + reason: format!("{e:?}"), + } + } } - BlockError::ExecutionPayloadError(epe) if !epe.penalize_peer() => { - return no_penalty("execution_payload"); - } - BlockError::ParentUnknown { .. } => return no_penalty("parent_unknown"), - // Bad-column attribution: penalize the custody peer that served the invalid column. - BlockError::AvailabilityCheck(AvailabilityCheckError::InvalidColumn((Some(idx), _))) => { - return BlockProcessingResult::Error { - penalty: Some(( - PeerAction::MidToleranceError, - WhichPeerToPenalize::CustodyPeerForColumn(*idx), - )), - reason: "lookup_custody_column_processing_failure", - }; - } - _ => {} - } - - // Attributable to the block peer (which is also the data peer pre-Gloas). - let reason = match process_type { - BlockProcessType::SingleBlock { .. } => "lookup_block_processing_failure", - BlockProcessType::SingleCustodyColumn(_) => "lookup_custody_column_processing_failure", - // Payload envelopes flow through classify_envelope_result; this branch shouldn't fire, - // but produce a sensible reason in case it ever does. - BlockProcessType::SinglePayloadEnvelope(_) => "lookup_envelope_processing_failure", - }; - BlockProcessingResult::Error { - penalty: Some(( - PeerAction::MidToleranceError, - WhichPeerToPenalize::BlockPeer, - )), - reason, } } -/// Translate an envelope-processing outcome into a `BlockProcessingResult`. Mirrors -/// `classify_processing_result` for the Gloas payload-envelope path. -fn classify_envelope_result( - result: Result< - AvailabilityProcessingStatus, - beacon_chain::payload_envelope_verification::EnvelopeError, - >, - _process_type: &BlockProcessType, -) -> BlockProcessingResult { - use beacon_chain::payload_envelope_verification::EnvelopeError; +/// Selector for which peer(s) in a `PeerGroup` to downscore. +#[derive(Debug, Clone, Copy)] +pub enum WhichPeerToPenalize { + /// All peers in the group (block peer, or all data peers). + BlockPeer, + /// Only the peer(s) that served the given column index. + CustodyPeerForColumn(u64), +} - let no_penalty = |reason| BlockProcessingResult::Error { - penalty: None, - reason, - }; - let penalize = |reason| BlockProcessingResult::Error { - penalty: Some(( - PeerAction::LowToleranceError, - WhichPeerToPenalize::BlockPeer, - )), - reason, - }; - match result { - Ok(AvailabilityProcessingStatus::Imported(_)) => { - BlockProcessingResult::Imported("envelope_imported") +impl WhichPeerToPenalize { + pub fn apply( + self, + action: PeerAction, + peer_group: &PeerGroup, + msg: &'static str, + cx: &mut SyncNetworkContext, + ) { + let peers: Vec = match self { + WhichPeerToPenalize::BlockPeer => peer_group.all().copied().collect(), + WhichPeerToPenalize::CustodyPeerForColumn(idx) => { + peer_group.of_index(idx as usize).copied().collect() + } + }; + for peer in peers { + cx.report_peer(peer, action, msg); } - Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { - BlockProcessingResult::Imported("envelope_missing_components") - } - Err( - EnvelopeError::BeaconChainError(_) - | EnvelopeError::BeaconStateError(_) - | EnvelopeError::ImportError(_) - | EnvelopeError::UnknownValidator { .. } - | EnvelopeError::PriorToFinalization { .. } - | EnvelopeError::BlockRootUnknown { .. }, - ) => no_penalty("envelope_non_attributable"), - Err(EnvelopeError::ExecutionPayloadError(epe)) if !epe.penalize_peer() => { - no_penalty("envelope_execution_payload") - } - // Anything else: peer served an invalid envelope. - Err( - EnvelopeError::BadSignature - | EnvelopeError::BuilderIndexMismatch { .. } - | EnvelopeError::SlotMismatch { .. } - | EnvelopeError::BlockHashMismatch { .. } - | EnvelopeError::IncorrectBlockProposer { .. } - | EnvelopeError::EnvelopeProcessingError(_) - | EnvelopeError::ExecutionPayloadError(_), - ) => penalize("lookup_envelope_processing_failure"), } } diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index c0b093e254..ad98851532 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -412,6 +412,7 @@ impl TestRig { DataColumnSubnetId::from_column_index(*data_column.index(), &self.chain.spec), data_column.clone(), Duration::from_secs(0), + true, ) .unwrap(); } diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index a8e5c9ae4a..277ece0aa8 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -422,6 +422,7 @@ impl Router { subnet_id, column_sidecar, seen_timestamp, + true, ), ) } diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs deleted file mode 100644 index 4306458615..0000000000 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ /dev/null @@ -1,164 +0,0 @@ -use crate::sync::block_lookups::single_block_lookup::{ - LookupRequestError, SingleBlockLookup, SingleLookupRequestState, -}; -use crate::sync::block_lookups::{BlockRequestState, CustodyRequestState, PeerId}; -use crate::sync::manager::BlockProcessType; -use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; -use beacon_chain::BeaconChainTypes; -use lighthouse_network::service::api_types::Id; -use parking_lot::RwLock; -use std::collections::HashSet; -use std::sync::Arc; -use types::{DataColumnSidecarList, SignedBeaconBlock}; - -use super::SingleLookupId; -use super::single_block_lookup::{ComponentRequests, DownloadResult}; - -#[derive(Debug, Copy, Clone)] -pub enum ResponseType { - Block, - CustodyColumn, -} - -/// This trait unifies common single block lookup functionality across blocks and data columns. -/// This includes making requests, verifying responses, and handling processing results. A -/// `SingleBlockLookup` includes both a `BlockRequestState` and a `CustodyRequestState`, this trait -/// is implemented for each. -/// -/// The use of the `ResponseType` associated type gives us a degree of type -/// safety when handling a block/column response ensuring we only mutate the correct corresponding -/// state. -pub trait RequestState { - /// The type created after validation. - type VerifiedResponseType: Clone; - - /// Request the network context to prepare a request of a component of `block_root`. If the - /// request is not necessary because the component is already known / processed, return false. - /// Return true if it sent a request and we can expect an event back from the network. - fn make_request( - &self, - id: Id, - lookup_peers: Arc>>, - expected_blobs: usize, - cx: &mut SyncNetworkContext, - ) -> Result; - - /* Response handling methods */ - - /// Send the response to the beacon processor. - fn send_for_processing( - id: Id, - result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError>; - - /* Utility methods */ - - /// Returns the `ResponseType` associated with this trait implementation. Useful in logging. - fn response_type() -> ResponseType; - - /// A getter for the `BlockRequestState` or `CustodyRequestState` associated with this trait. - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str>; - - /// A getter for a reference to the `SingleLookupRequestState` associated with this trait. - fn get_state(&self) -> &SingleLookupRequestState; - - /// A getter for a mutable reference to the SingleLookupRequestState associated with this trait. - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState; -} - -impl RequestState for BlockRequestState { - type VerifiedResponseType = Arc>; - - fn make_request( - &self, - id: SingleLookupId, - lookup_peers: Arc>>, - _: usize, - cx: &mut SyncNetworkContext, - ) -> Result { - cx.block_lookup_request(id, lookup_peers, self.requested_block_root) - .map_err(LookupRequestError::SendFailedNetwork) - } - - fn send_for_processing( - id: SingleLookupId, - download_result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - let DownloadResult { - value, - block_root, - seen_timestamp, - .. - } = download_result; - cx.send_block_for_processing(id, block_root, value, seen_timestamp) - .map_err(LookupRequestError::SendFailedProcessor) - } - - fn response_type() -> ResponseType { - ResponseType::Block - } - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { - Ok(&mut request.block_request_state) - } - fn get_state(&self) -> &SingleLookupRequestState { - &self.state - } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { - &mut self.state - } -} - -impl RequestState for CustodyRequestState { - type VerifiedResponseType = DataColumnSidecarList; - - fn make_request( - &self, - id: Id, - lookup_peers: Arc>>, - _: usize, - cx: &mut SyncNetworkContext, - ) -> Result { - cx.custody_lookup_request(id, self.block_root, self.slot, lookup_peers) - .map_err(LookupRequestError::SendFailedNetwork) - } - - fn send_for_processing( - id: Id, - download_result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - let DownloadResult { - value, - block_root, - seen_timestamp, - .. - } = download_result; - cx.send_custody_columns_for_processing( - id, - block_root, - value, - seen_timestamp, - BlockProcessType::SingleCustodyColumn(id), - ) - .map_err(LookupRequestError::SendFailedProcessor) - } - - fn response_type() -> ResponseType { - ResponseType::CustodyColumn - } - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { - match &mut request.component_requests { - ComponentRequests::WaitingForBlock => Err("waiting for block"), - ComponentRequests::ActiveCustodyRequest(request) => Ok(request), - ComponentRequests::NotNeeded { .. } => Err("not needed"), - } - } - fn get_state(&self) -> &SingleLookupRequestState { - &self.state - } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { - &mut self.state - } -} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 058d1a7808..15b5594747 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -21,14 +21,14 @@ //! returned to this module as `LookupRequestResult` variants. use self::parent_chain::{NodeChain, compute_parent_chains}; -pub use self::single_block_lookup::{AwaitingParent, DownloadResult}; -use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; -use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE}; -use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext}; +pub use self::single_block_lookup::DownloadResult; +use self::single_block_lookup::{LookupRequestError, LookupResult, PeerType, SingleBlockLookup}; +use super::manager::{BlockProcessType, SLOT_IMPORT_TOLERANCE}; +use super::network_context::{RpcResponseError, SyncNetworkContext}; use crate::metrics; +use crate::network_beacon_processor::BlockProcessingResult; use crate::sync::SyncMessage; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; -use crate::sync::block_lookups::single_block_lookup::PeerType; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; use lighthouse_network::PeerId; @@ -39,7 +39,10 @@ use std::sync::Arc; use std::time::Duration; use store::Hash256; use tracing::{debug, error, warn}; -use types::{EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope}; +use types::{ + DataColumnSidecarList, EthSpec, ExecutionBlockHash, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, +}; pub mod parent_chain; mod single_block_lookup; @@ -70,12 +73,11 @@ const LOOKUP_MAX_DURATION_NO_PEERS_SECS: u64 = 10; /// take at most 2 GB. 200 lookups allow 3 parallel chains of depth 64 (current maximum). const MAX_LOOKUPS: usize = 200; -type BlockDownloadResponse = - Result<(Arc>, PeerGroup, Duration), RpcResponseError>; +type BlockDownloadResponse = Result>>, RpcResponseError>; type CustodyDownloadResponse = - Result<(types::DataColumnSidecarList, PeerGroup, Duration), RpcResponseError>; + Result>, RpcResponseError>; type PayloadDownloadResponse = - Result<(Arc>, PeerGroup, Duration), RpcResponseError>; + Result>>, RpcResponseError>; pub enum BlockComponent { Block(DownloadResult>>), @@ -84,14 +86,6 @@ pub enum BlockComponent { pub type SingleLookupId = u32; -#[derive(Debug, Copy, Clone)] -pub enum NewLookupTrigger { - // `ParentUnknown` carries the parent block root for logging/metrics; not consumed - // elsewhere yet. Keep the field so the trigger reason stays in debug logs. - ParentUnknown(#[allow(dead_code)] Hash256), - NetworkMessage, -} - pub struct BlockLookups { /// A cache of block roots that must be ignored for some time to prevent useless searches. For /// example if a chain is too long, its lookup chain is dropped, and range sync is expected to @@ -179,33 +173,27 @@ impl BlockLookups { &mut self, block_root: Hash256, block_component: BlockComponent, - awaiting_parent: AwaitingParent, + parent_root: Hash256, + parent_block_hash: Option, peer_id: PeerId, - new_lookup_trigger: NewLookupTrigger, cx: &mut SyncNetworkContext, ) -> bool { - // We don't know the child's fork yet (no block downloaded), use PreGloas conservatively. - // The correct AwaitingParent will be set when the child's block downloads. - let parent_lookup_exists = self.search_parent_of_child( - awaiting_parent, - block_root, - &[peer_id], - new_lookup_trigger, - cx, - ); + let parent_lookup_exists = + self.search_parent_of_child(parent_root, parent_block_hash, block_root, &[peer_id], cx); // Only create the child lookup if the parent exists if parent_lookup_exists { // `search_parent_of_child` ensures that the parent lookup exists so we can safely wait for it self.new_current_lookup( block_root, Some(block_component), - Some(awaiting_parent), + Some(parent_root), // On a `UnknownParentBlock` or `UnknownParentSidecarHeader` event the peer is not - // required to have the rest of the block components (refer to decoupled blob - // gossip). Create the lookup with zero peers to house the block components. + // required to have the rest of the block components. Create the lookup with zero + // peers to house the block components. We don't know the child's fork yet, so use + // `PreGloas` conservatively; the correct peer set is established when the child's + // block downloads and its FULL children begin attesting. &[], &PeerType::PreGloas, - new_lookup_trigger, cx, ) } else { @@ -221,18 +209,9 @@ impl BlockLookups { &mut self, block_root: Hash256, peer_source: &[PeerId], - new_lookup_trigger: NewLookupTrigger, cx: &mut SyncNetworkContext, ) -> bool { - self.new_current_lookup( - block_root, - None, - None, - peer_source, - &PeerType::PreGloas, - new_lookup_trigger, - cx, - ) + self.new_current_lookup(block_root, None, None, peer_source, &PeerType::PreGloas, cx) } /// A block or blob triggers the search of a parent. @@ -244,20 +223,18 @@ impl BlockLookups { #[must_use = "only reference the new lookup if returns true"] pub fn search_parent_of_child( &mut self, - awaiting_parent: AwaitingParent, + block_root_to_search: Hash256, + // Post-Gloas only: the child's bid `parent_block_hash` (the parent's execution hash). Peers + // that imported the FULL child can serve the parent's payload envelope and data columns. + parent_block_hash: Option, child_block_root_trigger: Hash256, peers: &[PeerId], - new_lookup_trigger: NewLookupTrigger, cx: &mut SyncNetworkContext, ) -> bool { - let block_root_to_search = awaiting_parent.parent_root(); - - // The zero hash is the parent root of the genesis block, not a real block. - if block_root_to_search == Hash256::ZERO { - debug!("Not searching for zero hash (parent of genesis)"); - return false; - } - + let peer_type = match parent_block_hash { + Some(execution_hash) => PeerType::PostGloas(execution_hash), + None => PeerType::PreGloas, + }; let parent_chains = self.active_parent_lookups(); for (chain_idx, parent_chain) in parent_chains.iter().enumerate() { @@ -345,38 +322,21 @@ impl BlockLookups { } } - // Child's peers can serve block, and data + payload if the parent is full. - // In Gloas, data and payload are coupled: empty blocks have neither. - // Pre-Gloas: data is always needed with block, payload is never needed. - let peer_type = match awaiting_parent.gloas_bid_parent_hash() { - Some(parent_hash) => PeerType::PostGloas(parent_hash), - None => PeerType::PreGloas, - }; // `block_root_to_search` is a failed chain check happens inside new_current_lookup - self.new_current_lookup( - block_root_to_search, - None, - None, - peers, - &peer_type, - new_lookup_trigger, - cx, - ) + self.new_current_lookup(block_root_to_search, None, None, peers, &peer_type, cx) } /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is /// constructed. /// Returns true if the lookup is created or already exists #[must_use = "only reference the new lookup if returns true"] - #[allow(clippy::too_many_arguments)] fn new_current_lookup( &mut self, block_root: Hash256, block_component: Option>, - awaiting_parent: Option, + awaiting_parent: Option, peers: &[PeerId], peer_type: &PeerType, - new_lookup_trigger: NewLookupTrigger, cx: &mut SyncNetworkContext, ) -> bool { // If this block or it's parent is part of a known ignored chain, ignore it. @@ -411,7 +371,7 @@ impl BlockLookups { && !self .single_block_lookups .iter() - .any(|(_, lookup)| lookup.is_for_block(awaiting_parent.parent_root())) + .any(|(_, lookup)| lookup.is_for_block(awaiting_parent)) { warn!(block_root = ?awaiting_parent, "Ignoring child lookup parent lookup not found"); return false; @@ -452,7 +412,6 @@ impl BlockLookups { .map(|root| root.to_string()) .unwrap_or("none".to_owned()), id = lookup.id, - ?new_lookup_trigger, "Created block lookup" ); metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED); @@ -520,7 +479,7 @@ impl BlockLookups { pub fn peer_disconnected(&mut self, peer_id: &PeerId) { for (id, lookup) in self.single_block_lookups.iter_mut() { lookup.remove_peer(peer_id); - if !lookup.has_peers() { + if lookup.has_no_peers() { debug!(%id, "Lookup has no peers"); } } @@ -535,82 +494,31 @@ impl BlockLookups { cx: &mut SyncNetworkContext, ) { let lookup_id = process_type.id(); + let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else { + debug!(id = lookup_id, "Unknown single block lookup"); + return; + }; + + debug!( + block_root = ?lookup.block_root(), + id = lookup_id, + ?process_type, + ?result, + "Received lookup processing result" + ); + let lookup_result = match process_type { - BlockProcessType::SingleBlock { .. } => { - self.on_block_processing_result(lookup_id, result, cx) - } + BlockProcessType::SingleBlock { .. } => lookup.on_block_processing_result(result, cx), BlockProcessType::SingleCustodyColumn(_) => { - self.on_data_processing_result(lookup_id, result, cx) + lookup.on_data_processing_result(result, cx) } BlockProcessType::SinglePayloadEnvelope(_) => { - self.on_payload_processing_result(lookup_id, result, cx) + lookup.on_payload_processing_result(result, cx) } }; self.on_lookup_result(lookup_id, lookup_result, "processing_result", cx); } - /// Handle block processing result. The block is sent for processing alone (without data). - /// On success: marks block processing done and advances data/payload streams. - /// On error: penalizes block peer, resets all streams, retries from scratch. - fn on_block_processing_result( - &mut self, - lookup_id: SingleLookupId, - result: BlockProcessingResult, - cx: &mut SyncNetworkContext, - ) -> Result { - let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else { - debug!(id = lookup_id, "Unknown single block lookup"); - return Err(LookupRequestError::UnknownLookup); - }; - debug!( - block_root = ?lookup.block_root(), - id = lookup_id, - ?result, - "Received block processing result" - ); - lookup.on_block_processing_result(result, cx) - } - - /// Handle data processing result (blobs or custody columns). - fn on_data_processing_result( - &mut self, - lookup_id: SingleLookupId, - result: BlockProcessingResult, - cx: &mut SyncNetworkContext, - ) -> Result { - let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else { - debug!(id = lookup_id, "Unknown single block lookup"); - return Err(LookupRequestError::UnknownLookup); - }; - debug!( - block_root = ?lookup.block_root(), - id = lookup_id, - ?result, - "Received data processing result" - ); - lookup.on_data_processing_result(result, cx) - } - - /// Handle payload envelope processing result (Gloas only). - fn on_payload_processing_result( - &mut self, - lookup_id: SingleLookupId, - result: BlockProcessingResult, - cx: &mut SyncNetworkContext, - ) -> Result { - let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else { - debug!(id = lookup_id, "Unknown single block lookup"); - return Err(LookupRequestError::UnknownLookup); - }; - debug!( - block_root = ?lookup.block_root(), - id = lookup_id, - ?result, - "Received payload envelope processing result" - ); - lookup.on_payload_processing_result(result, cx) - } - pub fn on_external_processing_result( &mut self, block_root: Hash256, @@ -629,6 +537,14 @@ impl BlockLookups { let lookup_result = if imported { Ok(LookupResult::Completed) } else { + // A lookup may be in the following state: + // - Block awaiting processing from a different source + // - Blobs downloaded processed, and inserted into the da_checker + // + // At this point the block fails processing (e.g. execution engine offline) and it is + // removed from the da_checker. Note that ALL components are removed from the da_checker + // so when we re-download and process the block we get the error + // MissingComponentsAfterAllProcessed and get stuck. lookup.reset_requests(); lookup.continue_requests(cx) }; @@ -641,7 +557,7 @@ impl BlockLookups { let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self for (id, lookup) in self.single_block_lookups.iter_mut() { - if lookup.awaiting_parent().map(|a| a.parent_root()) == Some(block_root) { + if lookup.awaiting_parent() == Some(block_root) { lookup.resolve_awaiting_parent(); debug!( parent_root = ?block_root, @@ -677,10 +593,7 @@ impl BlockLookups { let child_lookups = self .single_block_lookups .iter() - .filter(|(_, lookup)| { - lookup.awaiting_parent().map(|a| a.parent_root()) - == Some(dropped_lookup.block_root()) - }) + .filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root())) .map(|(id, _)| *id) .collect::>(); @@ -702,16 +615,16 @@ impl BlockLookups { match result { Ok(LookupResult::Pending) => true, Ok(LookupResult::ParentUnknown { - awaiting_parent, + parent_root, + parent_block_hash, block_root, peers, - .. }) => { if self.search_parent_of_child( - awaiting_parent, + parent_root, + parent_block_hash, block_root, &peers, - NewLookupTrigger::ParentUnknown(awaiting_parent.parent_root()), cx, ) { true @@ -797,7 +710,7 @@ impl BlockLookups { .filter(|lookup| { // Do not drop lookup that are awaiting events to prevent inconsinstencies. If a // lookup gets stuck, it will be eventually pruned by `drop_stuck_lookups` - !lookup.has_peers() + lookup.has_no_peers() && lookup.elapsed_since_created() > Duration::from_secs(LOOKUP_MAX_DURATION_NO_PEERS_SECS) && !lookup.is_awaiting_event() @@ -868,16 +781,17 @@ impl BlockLookups { &'a self, lookup: &'a SingleBlockLookup, ) -> Result<&'a SingleBlockLookup, String> { - if let Some(awaiting) = lookup.awaiting_parent() { - let parent_root = awaiting.parent_root(); + if let Some(awaiting_parent) = lookup.awaiting_parent() { if let Some(lookup) = self .single_block_lookups .values() - .find(|l| l.block_root() == parent_root) + .find(|l| l.block_root() == awaiting_parent) { self.find_oldest_ancestor_lookup(lookup) } else { - Err(format!("Lookup references unknown parent {parent_root:?}")) + Err(format!( + "Lookup references unknown parent {awaiting_parent:?}" + )) } } else { Ok(lookup) @@ -885,9 +799,9 @@ impl BlockLookups { } /// Adds peers to a lookup and its ancestors recursively. - /// - Block peers are added at each level (needed for block download). - /// - When recursing from child to parent, also adds to parent's data/payload peer sets, - /// since children arriving activates the parent's data/payload downloads. + /// + /// Note: Takes a `lookup_id` as argument to allow recursion on mutable lookups, without having + /// to duplicate the code to add peers to a lookup fn add_peers_to_lookup_and_ancestors( &mut self, lookup_id: SingleLookupId, @@ -912,15 +826,17 @@ impl BlockLookups { } } - if let Some(awaiting) = lookup.awaiting_parent() { - let parent_root = awaiting.parent_root(); + if let Some(parent_root) = lookup.awaiting_parent() { + // When recursing from child to parent, the parent's peer set is keyed by the child's + // bid `parent_block_hash` (post-Gloas). A peer that imported this FULL child holds the + // parent's payload + columns. + let parent_peer_type = lookup.awaiting_parent_peer_type(); if let Some((&parent_id, _)) = self .single_block_lookups .iter() .find(|(_, l)| l.block_root() == parent_root) { - let peer_type = PeerType::from_awaiting_parent(awaiting); - self.add_peers_to_lookup_and_ancestors(parent_id, peers, &peer_type, cx) + self.add_peers_to_lookup_and_ancestors(parent_id, peers, &parent_peer_type, cx) } else { Err(format!("Lookup references unknown parent {parent_root:?}")) } diff --git a/beacon_node/network/src/sync/block_lookups/parent_chain.rs b/beacon_node/network/src/sync/block_lookups/parent_chain.rs index 120ce5b1cc..5deea1dd94 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_chain.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_chain.rs @@ -13,7 +13,7 @@ impl From<&SingleBlockLookup> for Node { fn from(value: &SingleBlockLookup) -> Self { Self { block_root: value.block_root(), - parent_root: value.awaiting_parent().map(|a| a.parent_root()), + parent_root: value.awaiting_parent(), } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 78e96e238f..1aa06efa93 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,14 +1,14 @@ use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; +use crate::network_beacon_processor::BlockProcessingResult; use crate::sync::block_lookups::{ BlockDownloadResponse, CustodyDownloadResponse, PayloadDownloadResponse, }; -use crate::sync::manager::{BlockProcessType, BlockProcessingResult}; +use crate::sync::manager::BlockProcessType; use crate::sync::network_context::{ LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, RpcResponseError, SendErrorProcessor, SyncNetworkContext, }; use beacon_chain::BeaconChainTypes; -use beacon_chain::BlockProcessStatus; use beacon_chain::block_verification_types::AsBlock; use educe::Educe; use lighthouse_network::service::api_types::Id; @@ -18,78 +18,34 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use store::Hash256; use strum::IntoStaticStr; -use tracing::{Span, debug, debug_span}; +use tracing::{Span, debug_span}; use types::{ - ChainSpec, DataColumnSidecarList, EthSpec, ExecutionBlockHash, ForkName, SignedBeaconBlock, + DataColumnSidecarList, EthSpec, ExecutionBlockHash, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; -/// What a child lookup is waiting for its parent to resolve. -/// -/// `parent_hash` is `Some` only post-Gloas: the child's bid references the -/// parent's payload execution hash, which lets us determine whether the parent -/// is full (payload envelope was published) or empty. Pre-Gloas lookups never -/// need to distinguish — they always wait for the full block+data set. -#[derive(Debug, Clone, Copy)] -pub struct AwaitingParent { - parent_root: Hash256, - gloas_bid_parent_hash: Option, -} - -impl AwaitingParent { - pub fn parent_root(&self) -> Hash256 { - self.parent_root - } - - pub fn gloas_bid_parent_hash(&self) -> Option { - self.gloas_bid_parent_hash - } - - pub fn from_block(block: &SignedBeaconBlock) -> Self { - Self { - parent_root: block.message().parent_root(), - gloas_bid_parent_hash: if let Ok(bid) = - block.message().body().signed_execution_payload_bid() - { - Some(bid.message.parent_block_hash) - } else { - None - }, - } - } - - pub fn from_block_header( +// Dedicated enum for LookupResult to force its usage +#[must_use = "LookupResult must be handled with on_lookup_result"] +pub enum LookupResult { + /// Lookup completed successfully + Completed, + /// Lookup is expecting some future event from the network + Pending, + /// Block's parent is not known to fork-choice, a parent lookup is needed + ParentUnknown { parent_root: Hash256, - slot: Slot, - spec: &ChainSpec, - ) -> Result { - if spec.fork_name_at_slot::(slot).gloas_enabled() { - Err("AwaitingParent can not be created from a Gloas header".to_owned()) - } else { - Ok(Self { - parent_root, - gloas_bid_parent_hash: None, - }) - } - } -} - -#[derive(Debug, Clone)] -#[allow(dead_code)] -pub struct DownloadResult { - pub value: T, - pub block_root: Hash256, - pub seen_timestamp: Duration, - pub peer_group: PeerGroup, + /// Post-Gloas only: the child's bid `parent_block_hash`. Lets the parent lookup partition + /// peers (a peer that imported this FULL child holds the parent's payload + columns). + parent_block_hash: Option, + block_root: Hash256, + peers: Vec, + }, } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupRequestError { /// Too many failed attempts - TooManyAttempts { - /// The failed attempts were primarily due to processing failures. - cannot_process: bool, - }, + TooManyAttempts, /// Error sending event to network SendFailedNetwork(RpcRequestSendError), /// Error sending event to processor @@ -107,411 +63,125 @@ pub enum LookupRequestError { expected_req_id: ReqId, req_id: ReqId, }, - InternalError(String), } -// Dedicated enum for LookupResult to force its usage -#[must_use = "LookupResult must be handled with on_lookup_result"] -pub enum LookupResult { - /// Lookup completed successfully - Completed, - /// Lookup is expecting some future event from the network - Pending, - /// Block's parent is not known to fork-choice, a parent lookup is needed - ParentUnknown { - awaiting_parent: AwaitingParent, - block_root: Hash256, - peers: Vec, - }, -} +type PeerSet = Arc>>; +/// Peers that claim to have imported a FULL child of this lookup's block, keyed by the child's bid +/// `parent_block_hash` (which equals this block's bid `block_hash` when the child is FULL). Only +/// such peers are proven to hold this block's execution payload envelope and its data columns. +type GloasChildPeers = Arc>>; -#[derive(Educe)] -#[educe(Debug)] -enum BlockRequest { - /// Block downloading or awaiting download - Downloading { - block_root: Hash256, - state: SingleLookupRequestState>>, - }, - /// Block downloaded, waiting for parent check + send for processing - Downloaded { - #[educe(Debug(ignore))] - block: Arc>, - peer: PeerId, - }, - /// Block sent for processing, awaiting result - Processing { - #[educe(Debug(ignore))] - block: Arc>, - peer: PeerId, - }, - /// Block processing complete. `peer` is retained so data/payload processing failures - /// after the block has been imported can still be attributed back to the peer that - /// served the block (they are typically the same peer for blobs). `None` when the - /// block bypassed the download path (cache hit in the availability checker). - Complete { - #[educe(Debug(ignore))] - block: Arc>, - peer: Option, - }, +#[derive(Debug)] +struct BlockRequest { + state: SingleLookupRequestState>>, } impl BlockRequest { - fn new(block_root: Hash256) -> Self { - BlockRequest::Downloading { - block_root, + fn new() -> Self { + Self { state: SingleLookupRequestState::new(), } } - fn new_with_processing_failures(block_root: Hash256, failed_processing: u8) -> Self { - BlockRequest::Downloading { - block_root, - state: SingleLookupRequestState::new_with_processing_failures(failed_processing), - } - } - - fn peek_block(&self) -> Option<&Arc>> { - match self { - BlockRequest::Downloading { state, .. } => state.peek_downloaded_data(), - BlockRequest::Downloaded { block, .. } - | BlockRequest::Processing { block, .. } - | BlockRequest::Complete { block, .. } => Some(block), - } - } - - fn peek_slot(&self) -> Option { - self.peek_block().map(|b| b.slot()) - } - - fn is_awaiting_event(&self) -> bool { - match self { - BlockRequest::Downloading { state, .. } => state.is_awaiting_event(), - BlockRequest::Processing { .. } => true, - _ => false, - } - } - fn is_complete(&self) -> bool { - matches!(self, BlockRequest::Complete { .. }) - } - - /// Best-effort lookup of the block: prefer the in-flight download if we have it; otherwise - /// fall back to the chain's processing-status cache (the block may have arrived via gossip / - /// HTTP API before this lookup downloads it). - fn peek_block_or_cached>( - &self, - block_root: Hash256, - cx: &mut SyncNetworkContext, - ) -> Option>> { - self.peek_block().cloned().or_else(|| { - match cx.chain.get_block_process_status(&block_root) { - BlockProcessStatus::NotValidated(block, _) - | BlockProcessStatus::ExecutionValidated(block) => Some(block), - BlockProcessStatus::Unknown => None, - } - }) - } - - fn insert_verified_response( - &mut self, - result: DownloadResult>>, - ) -> bool { - if let BlockRequest::Downloading { state, .. } = self { - state.insert_verified_response(result) - } else { - // The block already transitioned past Downloading (e.g. a child arrived while the - // block was already being processed). Silently dropping would be hard to debug if - // we ever reach this path unexpectedly — log it. - debug!( - state = ?self, - "insert_verified_response called outside Downloading state, dropping" - ); - false - } + self.state.is_processed() } } #[derive(Debug)] -struct DataRequest { - peers: PeerSet, - state: DataRequestState, -} - -#[derive(Debug)] -enum DataRequestState { - /// Data downloading or awaiting download - Downloading(DataDownload), - /// Data downloaded, waiting for block processing to complete before import - Downloaded { - data: DownloadedData, - peer_group: PeerGroup, +enum DataRequest { + WaitingForBlock, + Request { + slot: Slot, + /// Peers to fetch the data columns from. Pre-Gloas this is the lookup's `peers`; for FULL + /// Gloas blocks this is the `gloas_child_peers` set proven to hold the columns. + peers: PeerSet, + state: SingleLookupRequestState>, }, - /// Data sent for processing, awaiting result - Processing { peer_group: PeerGroup }, - /// Data processing complete (or no data needed) - Complete, + NoData, } impl DataRequest { fn is_complete(&self) -> bool { - matches!(self.state, DataRequestState::Complete) - } -} - -impl DataRequestState { - fn is_awaiting_event(&self) -> bool { match &self { - Self::Downloading(dl) => dl.is_awaiting_event(), - Self::Processing { .. } => true, - _ => false, + DataRequest::WaitingForBlock => false, + DataRequest::Request { state, .. } => state.is_processed(), + DataRequest::NoData => true, } } } -/// Fork-dependent data download state +/// Tracks the download + processing of a Gloas execution payload envelope. For FULL Gloas blocks the +/// execution payload arrives as a separate `SignedExecutionPayloadEnvelope`, mirroring the way data +/// columns are fetched and processed by `DataRequest`. #[derive(Debug)] -enum DataDownload { - Columns { - block_root: Hash256, - slot: Slot, - state: SingleLookupRequestState>, - }, -} - -impl DataDownload { - fn send_request>( - &mut self, - id: Id, +enum PayloadRequest { + /// Block not yet downloaded, can't tell if a payload is needed. + WaitingForBlock, + /// Post-Gloas block: an execution payload envelope must be fetched and processed *if* the block + /// is FULL. We can't tell FULL from EMPTY from the block alone: only a FULL child of this block + /// proves a payload was published, which is signalled by `peers` becoming non-empty. While + /// `peers` is empty the block is assumed EMPTY and this request is considered complete. + Request { peers: PeerSet, - cx: &mut SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - match self { - DataDownload::Columns { - block_root, - slot, - state, - } => { - let br = *block_root; - state.make_request(|| cx.custody_lookup_request(id, br, *slot, peers)) - } - } - } - - fn is_completed(&self) -> bool { - match self { - DataDownload::Columns { state, .. } => state.is_completed(), - } - } - - fn take_download_result(&mut self) -> Option<(DownloadedData, PeerGroup)> { - match self { - DataDownload::Columns { state, .. } => state - .take_download_result() - .map(|r| (DownloadedData::Columns(r.value), r.peer_group)), - } - } - - fn is_awaiting_event(&self) -> bool { - match self { - DataDownload::Columns { state, .. } => state.is_awaiting_event(), - } - } -} - -/// Downloaded data, waiting to be sent for processing -#[derive(Debug)] -enum DownloadedData { - Columns(DataColumnSidecarList), -} - -impl DownloadedData { - fn send_for_processing>( - &self, - id: Id, - block_root: Hash256, - cx: &mut SyncNetworkContext, - ) -> Result<(), SendErrorProcessor> { - match self { - DownloadedData::Columns(columns) => cx.send_custody_columns_for_processing( - id, - block_root, - columns.clone(), - Duration::ZERO, - BlockProcessType::SingleCustodyColumn(id), - ), - } - } -} - -#[derive(Debug)] -struct PayloadRequest { - peers: PeerSet, - state: PayloadRequestState, -} - -#[derive(Educe)] -#[educe(Debug)] -enum PayloadRequestState { - Downloading { state: SingleLookupRequestState>>, }, - Downloaded { - #[educe(Debug(ignore))] - envelope: Arc>, - peer_group: PeerGroup, - }, - Processing { - peer_group: PeerGroup, - }, - /// Payload processed, or no payload needed. - Complete, + /// Pre-Gloas block: no payload envelope exists, nothing to fetch. + PreGloas, } impl PayloadRequest { fn is_complete(&self) -> bool { - if !self.state.is_awaiting_event() && self.peers.read().is_empty() { - return true; - } - matches!(self.state, PayloadRequestState::Complete) - } -} - -impl PayloadRequestState { - fn is_awaiting_event(&self) -> bool { - match self { - Self::Downloading { state, .. } => state.is_awaiting_event(), - Self::Processing { .. } => true, - _ => false, - } - } -} - -impl DataRequestState { - fn new( - slot: Slot, - block_root: Hash256, - expected_blobs: usize, - failed_processing: u8, - spec: &ChainSpec, - ) -> Self { - let block_fork = spec.fork_name_at_slot::(slot); - - match block_fork { - ForkName::Base - | ForkName::Altair - | ForkName::Bellatrix - | ForkName::Capella - | ForkName::Deneb - | ForkName::Electra => Self::Complete, - ForkName::Fulu => { - if expected_blobs > 0 { - Self::Downloading(DataDownload::Columns { - block_root, - slot, - state: SingleLookupRequestState::new_with_processing_failures( - failed_processing, - ), - }) - } else { - Self::Complete - } - } - ForkName::Gloas => { - if expected_blobs > 0 { - Self::Downloading(DataDownload::Columns { - block_root, - slot, - state: SingleLookupRequestState::new_with_processing_failures( - failed_processing, - ), - }) - // Gloas: data peers start at 0, populated when children arrive - } else { - Self::Complete + match &self { + PayloadRequest::WaitingForBlock => false, + PayloadRequest::Request { peers, state } => { + // EMPTY Gloas block: no FULL child has proven a payload exists, so there is nothing + // to fetch and the request never made it past `AwaitingDownload`. + if !state.is_awaiting_event() && peers.read().is_empty() { + return true; } + state.is_processed() } + PayloadRequest::PreGloas => true, } } } -impl PayloadRequestState { - /// Create payload request based on the downloaded block's content and fork. - fn new(slot: Slot, spec: &ChainSpec) -> Self { - // Genesis has no execution payload envelope by definition, regardless of fork. - if slot == spec.genesis_slot { - return Self::Complete; - } - - let block_fork = spec.fork_name_at_slot::(slot); - - match block_fork { - ForkName::Base - | ForkName::Altair - | ForkName::Bellatrix - | ForkName::Capella - | ForkName::Deneb - | ForkName::Electra - | ForkName::Fulu => Self::Complete, - ForkName::Gloas => Self::Downloading { - state: SingleLookupRequestState::new(), - }, - } - } +/// Classifies how a peer relates to a lookup, controlling which peer set it is added to. +pub enum PeerType { + /// Pre-Gloas: the peer can serve the block and its data columns. + PreGloas, + /// Post-Gloas: the peer claims to have imported a child of this block whose bid references + /// `ExecutionBlockHash` as its parent. Such peers can serve this block's payload envelope and + /// data columns (only if this block is FULL). + PostGloas(ExecutionBlockHash), } -type PeerSet = Arc>>; -type GloasChildPeers = Arc>>; - #[derive(Educe)] #[educe(Debug(bound(T: BeaconChainTypes)))] pub struct SingleBlockLookup { pub id: Id, block_root: Hash256, - - // Block request — always present block_request: BlockRequest, - - // Data request — starts as WaitingForBlock, set after block downloaded - data_request: Option>, - - // Payload request — starts as WaitingForBlock, set after block downloaded - payload_request: Option>, - - // Peer sets. - // - // `Arc>` is required by `ActiveCustodyRequest` (columns only), which lives - // in `SyncNetworkContext` and needs to observe peers being added/removed at runtime - // while it's in flight. `data_peers` and `payload_peers` use the same shape purely for - // consistency so all three sets plug into the same `add_peer` / `remove_peer` surface. - /// Peers for block download (also used for data in pre-Gloas forks). + data_request: DataRequest, + payload_request: PayloadRequest, + /// Peers that claim to have imported this set of block components. This state is shared with + /// the custody request to have an updated view of the peers that claim to have imported the + /// block associated with this lookup. The peer set of a lookup can change rapidly, and faster + /// than the lifetime of a custody request. #[educe(Debug(method(fmt_peer_set_as_len)))] peers: PeerSet, - /// Peers for payload download (0 initially, Gloas only). + /// Post-Gloas only: peers that claim to have imported a FULL child of this block, keyed by the + /// child's bid `parent_block_hash`. These (not `peers`) are the peers proven to hold this + /// block's payload envelope and data columns. #[educe(Debug(method(fmt_peer_map_as_len)))] gloas_child_peers: GloasChildPeers, - - // Parent tracking - awaiting_parent: Option, + awaiting_parent: Option, + /// Post-Gloas only: this block's bid `parent_block_hash` (the parent's execution hash). Used to + /// derive the `PeerType` when propagating peers up to the parent lookup. + awaiting_parent_bid_hash: Option, created: Instant, pub(crate) span: Span, - - // Retry tracking - failed_processing: u8, -} - -pub enum PeerType { - PreGloas, - PostGloas(ExecutionBlockHash), -} - -impl PeerType { - pub fn from_awaiting_parent(awaiting_parent: AwaitingParent) -> Self { - match awaiting_parent.gloas_bid_parent_hash() { - Some(parent_hash) => Self::PostGloas(parent_hash), - None => Self::PreGloas, - } - } } impl SingleBlockLookup { @@ -520,7 +190,7 @@ impl SingleBlockLookup { peers: &[PeerId], peer_type: &PeerType, id: Id, - awaiting_parent: Option, + awaiting_parent: Option, ) -> Self { let lookup_span = debug_span!( "lh_single_block_lookup", @@ -530,7 +200,6 @@ impl SingleBlockLookup { let block_peers: PeerSet = Arc::new(RwLock::new(peers.iter().copied().collect())); let mut gloas_child_peers = HashMap::new(); - match peer_type { PeerType::PreGloas => {} PeerType::PostGloas(execution_hash) => { @@ -541,32 +210,31 @@ impl SingleBlockLookup { Self { id, block_root: requested_block_root, - block_request: BlockRequest::new(requested_block_root), - data_request: None, - payload_request: None, + block_request: BlockRequest::new(), + data_request: DataRequest::WaitingForBlock, + payload_request: PayloadRequest::WaitingForBlock, peers: block_peers, gloas_child_peers: Arc::new(RwLock::new(gloas_child_peers)), awaiting_parent, + awaiting_parent_bid_hash: None, created: Instant::now(), - failed_processing: 0, span: lookup_span, } } /// Reset the status of all requests (used on block processing failure) pub fn reset_requests(&mut self) { - // Increment processing failure counter (we're resetting due to processing error) - self.failed_processing = self.failed_processing.saturating_add(1); - // Reset to fresh Downloading state with the updated counter - self.block_request = - BlockRequest::new_with_processing_failures(self.block_root, self.failed_processing); - self.data_request = None; - self.payload_request = None; + self.block_request = BlockRequest::new(); + self.data_request = DataRequest::WaitingForBlock; + self.payload_request = PayloadRequest::WaitingForBlock; } /// Return the slot of this lookup's block if it's currently cached pub fn peek_downloaded_block_slot(&self) -> Option { - self.block_request.peek_slot() + self.block_request + .state + .peek_downloaded_data() + .map(|block| block.slot()) } /// Get the block root that is being requested. @@ -574,19 +242,35 @@ impl SingleBlockLookup { self.block_root } - /// Check the block root matches the requested block root. - pub fn is_for_block(&self, block_root: Hash256) -> bool { - self.block_root == block_root + pub fn awaiting_parent(&self) -> Option { + self.awaiting_parent } - pub fn awaiting_parent(&self) -> Option { - self.awaiting_parent + /// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send + /// components for processing. `parent_block_hash` is the block's bid `parent_block_hash` + /// (post-Gloas only), used to partition the parent lookup's peers. + pub fn set_awaiting_parent( + &mut self, + parent_root: Hash256, + parent_block_hash: Option, + ) { + self.awaiting_parent = Some(parent_root); + self.awaiting_parent_bid_hash = parent_block_hash; } /// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for /// processing. pub fn resolve_awaiting_parent(&mut self) { self.awaiting_parent = None; + self.awaiting_parent_bid_hash = None; + } + + /// Returns the `PeerType` to use when propagating this lookup's peers up to its parent lookup. + pub fn awaiting_parent_peer_type(&self) -> PeerType { + match self.awaiting_parent_bid_hash { + Some(execution_hash) => PeerType::PostGloas(execution_hash), + None => PeerType::PreGloas, + } } /// Returns the time elapsed since this lookup was created @@ -597,246 +281,150 @@ impl SingleBlockLookup { /// Maybe insert a verified response into this lookup. Returns true if imported pub fn add_child_components(&mut self, block_component: BlockComponent) -> bool { match block_component { - BlockComponent::Block(block) => self.block_request.insert_verified_response(block), + BlockComponent::Block(block) => { + self.block_request.state.insert_verified_response(block) + } BlockComponent::Sidecar => { - // For now ignore single blobs and columns, as the blob request state assumes all - // blobs are attributed to the same peer = the peer serving the remaining blobs. + // There's nothing to do here, there's no component to insert. The lookup downloads + // its required data columns itself once it has the block. false } } } + /// Check the block root matches the requested block root. + pub fn is_for_block(&self, block_root: Hash256) -> bool { + self.block_root() == block_root + } + /// Returns true if this request is expecting some event to make progress pub fn is_awaiting_event(&self) -> bool { self.awaiting_parent.is_some() - || self.block_request.is_awaiting_event() + || self.block_request.state.is_awaiting_event() || match &self.data_request { - Some(request) => request.state.is_awaiting_event(), - None => true, + DataRequest::WaitingForBlock => true, + DataRequest::Request { state, .. } => state.is_awaiting_event(), + DataRequest::NoData => false, } || match &self.payload_request { - Some(request) => request.state.is_awaiting_event(), - None => true, + PayloadRequest::WaitingForBlock => true, + PayloadRequest::Request { state, .. } => state.is_awaiting_event(), + PayloadRequest::PreGloas => false, } } /// Makes progress on all requests of this lookup. Any error is not recoverable and must result /// in dropping the lookup. May mark the lookup as completed. - /// - /// Each of the block / data / payload sub-state-machines is driven inside its own `loop` - /// so that synchronous state transitions (e.g. Downloading → Downloaded → Processing) run - /// without returning. Each loop `break`s when further progress requires an external event - /// (download response, processing result, or a parent lookup to resolve). pub fn continue_requests( &mut self, cx: &mut SyncNetworkContext, ) -> Result { let _guard = self.span.clone().entered(); - let id = self.id; - let block_root = self.block_root; // === Block request === - loop { - match &mut self.block_request { - BlockRequest::Downloading { state, .. } => { - let peers = self.peers.clone(); - state.make_request(|| cx.block_lookup_request(id, peers, block_root))?; - - if state.is_completed() { - // Block is fully execution-validated and cached in the da_checker or fully - // imported. - // The block MUST be somewhere... and the code below needs to block to know - // if it should fetch data - let block = match cx.chain.get_block_process_status(&block_root) { - BlockProcessStatus::NotValidated(block, _) - | BlockProcessStatus::ExecutionValidated(block) => block, - BlockProcessStatus::Unknown => { - // Race: the block was imported into fork-choice between - // `block_lookup_request` and this check. All components must - // have landed with it, so the lookup has nothing left to do. - panic!("We have to find the block somewhere"); - } - }; - // No peer to attribute against on a cache hit. - self.block_request = BlockRequest::Complete { block, peer: None }; - } else if let Some(result) = state.take_download_result() { - // Block download requests are sent to a single peer, so the returned - // PeerGroup contains exactly one entry. Take the first and only. - let peer = result.peer_group.all().next().copied().ok_or_else(|| { - LookupRequestError::BadState("block download has no peer".into()) - })?; - self.block_request = BlockRequest::Downloaded { - block: result.value, - peer, - }; - } else { - // Awaiting download - break; - } - } - BlockRequest::Downloaded { block, peer } => { - if self.awaiting_parent.is_some() { - break; - } - - // Check if the parent block is known to fork-choice. If the block is FULL - // expect the payload to be imported too. - if !cx - .chain - .canonical_head - .fork_choice_read_lock() - .is_parent_imported(block) - { - // Parent block is unknown, or it's FULL and the parent's payload has not - // been imported yet. Park this lookup until the parent resolves. - let awaiting_parent = AwaitingParent::from_block(block); - self.awaiting_parent = Some(awaiting_parent); - return Ok(LookupResult::ParentUnknown { - awaiting_parent, - block_root: self.block_root, - peers: self.all_peers(), - }); - } - - let block = block.clone(); - let peer = *peer; - cx.send_block_for_processing( - id, - self.block_root, - block.clone(), - Duration::ZERO, - ) - .map_err(LookupRequestError::SendFailedProcessor)?; - self.block_request = BlockRequest::Processing { block, peer }; - // Processing needs an async trigger (block processing result) before we - // can make progress. - break; - } - BlockRequest::Processing { .. } | BlockRequest::Complete { .. } => break, - } + self.block_request.state.maybe_start_downloading(|| { + cx.block_lookup_request(self.id, self.peers.clone(), self.block_root) + })?; + if self.awaiting_parent.is_none() + && let Some(data) = self.block_request.state.maybe_start_processing() + { + cx.send_block_for_processing(self.id, self.block_root, data.value, data.seen_timestamp) + .map_err(LookupRequestError::SendFailedProcessor)?; } // === Data request === loop { match &mut self.data_request { - // None = waiting for block - None => { - let Some(block) = self.block_request.peek_block_or_cached(block_root, cx) - else { + DataRequest::WaitingForBlock => { + if let Some(block) = self.block_request.state.peek_downloaded_data() { + let block_epoch = block + .slot() + .epoch(::EthSpec::slots_per_epoch()); + self.data_request = if block.num_expected_blobs() == 0 { + DataRequest::NoData + } else if cx.chain.should_fetch_custody_columns(block_epoch) { + let slot = block.slot(); + // Post-Gloas data columns are served by the FULL children's peers, not + // by `self.peers`. Pre-Gloas this returns `self.peers` unchanged. + let peers = self.get_data_peers(block); + DataRequest::Request { + slot, + peers, + state: SingleLookupRequestState::new(), + } + } else { + DataRequest::NoData + }; + } else { break; - }; - let peers = self.get_data_peers::(&block); - self.data_request = Some(DataRequest { - peers, - state: DataRequestState::new( - block.slot(), + } + } + DataRequest::Request { slot, peers, state } => { + state.maybe_start_downloading(|| { + cx.custody_lookup_request(self.id, self.block_root, *slot, peers.clone()) + })?; + // Wait for the parent to be imported, data column processing result handle does + // not support `ParentUnknown`. + if self.awaiting_parent.is_none() + && let Some(data) = state.maybe_start_processing() + { + cx.send_custody_columns_for_processing( + self.id, self.block_root, - block.num_expected_blobs(), - self.failed_processing, - cx.spec(), - ), - }); - } - Some(request) => match &mut request.state { - DataRequestState::Downloading(dl) => { - // Custody column downloads dispatch against the global synced peer pool - // inside `ActiveCustodyRequest`, not against `data_peers`. Only gate on - // `data_peers` for post-Gloas, where peer sets are strictly partitioned - // and no fallback pool exists. - dl.send_request(id, request.peers.clone(), cx)?; - - if dl.is_completed() { - // All data already imported (e.g. received via gossip) - request.state = DataRequestState::Complete; - } else if let Some((data, peer_group)) = dl.take_download_result() { - request.state = DataRequestState::Downloaded { data, peer_group }; - } else { - // Wait for data to be downloaded - break; - } - } - DataRequestState::Downloaded { data, peer_group } => { - data.send_for_processing(id, self.block_root, cx) - .map_err(LookupRequestError::SendFailedProcessor)?; - let peer_group = peer_group.clone(); - request.state = DataRequestState::Processing { peer_group }; - // Processing needs an async trigger. - break; - } - DataRequestState::Processing { .. } | DataRequestState::Complete => break, - }, - } - } - - // === Payload request === - loop { - match &mut self.payload_request { - None => { - let Some(block) = self.block_request.peek_block_or_cached(block_root, cx) - else { - break; - }; - let peers = self.get_data_peers(&block); - self.payload_request = Some(PayloadRequest { - peers, - state: PayloadRequestState::new(block.slot(), cx.spec()), - }); - } - Some(request) => match &mut request.state { - PayloadRequestState::Downloading { state, .. } => { - // Peers in `request.peers` are those that have signalled they imported a - // child of this block whose bid's parent_hash matches our execution_hash — - // i.e. they are proven to have the envelope. `make_request` is a no-op if - // a request is already in flight, so it's safe to call on every tick. - let peers = request.peers.clone(); - state.make_request(|| cx.payload_lookup_request(id, peers, block_root))?; - - if state.is_completed() { - // Envelope already known to fork-choice (NoRequestNeeded). - request.state = PayloadRequestState::Complete; - continue; - } - if let Some(result) = state.take_download_result() { - request.state = PayloadRequestState::Downloaded { - envelope: result.value, - peer_group: result.peer_group, - }; - } else { - break; - } - } - PayloadRequestState::Downloaded { - envelope, - peer_group, - } => { - if !self.block_request.is_complete() { - break; - } - let envelope = envelope.clone(); - let peer_group = peer_group.clone(); - cx.send_payload_for_processing( - block_root, - envelope, - Duration::ZERO, - BlockProcessType::SinglePayloadEnvelope(id), + data.value, + data.seen_timestamp, + BlockProcessType::SingleCustodyColumn(self.id), ) .map_err(LookupRequestError::SendFailedProcessor)?; - request.state = PayloadRequestState::Processing { peer_group }; - // Processing needs an async trigger. - break; } - PayloadRequestState::Processing { .. } | PayloadRequestState::Complete => break, - }, + break; + } + DataRequest::NoData => break, } } - // === Check completion === + // === Payload request (Gloas only) === + loop { + match &mut self.payload_request { + PayloadRequest::WaitingForBlock => { + if let Some(block) = self.block_request.state.peek_downloaded_data() { + self.payload_request = if block.fork_name_unchecked().gloas_enabled() { + PayloadRequest::Request { + peers: self.get_data_peers(block), + state: SingleLookupRequestState::new(), + } + } else { + PayloadRequest::PreGloas + }; + } else { + break; + } + } + PayloadRequest::Request { peers, state } => { + state.maybe_start_downloading(|| { + cx.payload_lookup_request(self.id, peers.clone(), self.block_root) + })?; + if let Some(data) = state.maybe_start_processing() { + cx.send_payload_for_processing( + self.block_root, + data.value, + data.seen_timestamp, + BlockProcessType::SinglePayloadEnvelope(self.id), + ) + .map_err(LookupRequestError::SendFailedProcessor)?; + } + break; + } + PayloadRequest::PreGloas => break, + } + } + + // If all components of this lookup are already processed, there will be no future events + // that can make progress so it must be dropped. Consider the lookup completed. + // This case can happen if we receive the components from gossip during a retry. if self.block_request.is_complete() - && self.data_request.as_ref().is_some_and(|r| r.is_complete()) - && self - .payload_request - .as_ref() - .is_some_and(|r| r.is_complete()) + && self.data_request.is_complete() + && self.payload_request.is_complete() { return Ok(LookupResult::Completed); } @@ -844,139 +432,91 @@ impl SingleBlockLookup { Ok(LookupResult::Pending) } - fn get_data_peers(&self, block: &SignedBeaconBlock) -> PeerSet { - if let Ok(bid) = block.message().body().signed_execution_payload_bid() { - // For Gloas, the child-attested peer set for this bid is the canonical peer set. - self.gloas_child_peers + /// Returns the peers that should serve this block's data columns and payload envelope. For FULL + /// Gloas blocks these are the peers that claimed to have imported a FULL child of this block + /// (keyed by this block's bid `block_hash`). Pre-Gloas blocks carry no bid, so this returns the + /// lookup's `peers` unchanged. + fn get_data_peers(&self, block: &SignedBeaconBlock) -> PeerSet { + match block.payload_bid_block_hash() { + // Gloas: the child-attested peer set for this bid is the canonical peer set. DO NOT + // default to `self.peers`: post-Gloas `self.peers` have not claimed to import this + // block's data nor its payload. This set may remain empty until a FULL child arrives. + Ok(block_hash) => self + .gloas_child_peers .write() - .entry(bid.message.block_hash) + .entry(block_hash) .or_default() - .clone() - // DO NOT DEFAULT TO `self.peers` HERE! Post gloas `self.peers` have not claimed to - // import the block's data nor the payload. This PeerSet may remain empty until we - // receive a FULL child of this lookup. - } else { - self.peers.clone() + .clone(), + Err(_) => self.peers.clone(), } } - // -- Processing result handlers -- - /// Handle block processing result. Advances the lookup state machine. pub fn on_block_processing_result( &mut self, result: BlockProcessingResult, cx: &mut SyncNetworkContext, ) -> Result { - let BlockRequest::Processing { block, peer } = &self.block_request else { - return Err(LookupRequestError::BadState( - "block processing result but not in Processing state".to_owned(), - )); - }; - let block_peer = *peer; - match result { - BlockProcessingResult::Imported(_) => { - let block = block.clone(); - self.block_request = BlockRequest::Complete { - block, - peer: Some(block_peer), - }; - self.continue_requests(cx) + BlockProcessingResult::Imported(_fully_imported, _info) => { + self.block_request.state.on_processing_success()?; } - BlockProcessingResult::Error { penalty, reason } => { - if let Some((action, whom)) = penalty { - whom.apply(action, &PeerGroup::from_single(block_peer), reason, cx); + BlockProcessingResult::ParentUnknown { + parent_root, + parent_block_hash, + } => { + // `BlockError::ParentUnknown` is only returned when processing blocks. Revert the + // block request to `Downloaded` and park this lookup until the parent resolves; a + // future call to `continue_requests` will re-submit the block for processing once + // the parent lookup completes. + self.block_request.state.revert_to_awaiting_processing()?; + self.set_awaiting_parent(parent_root, parent_block_hash); + return Ok(LookupResult::ParentUnknown { + parent_root, + parent_block_hash, + block_root: self.block_root, + peers: self.all_peers(), + }); + } + BlockProcessingResult::Error { penalty, .. } => { + let peers = self.block_request.state.on_processing_failure()?; + if let Some((action, whom, msg)) = penalty { + whom.apply(action, &peers, msg, cx); } - // Block processing failed — reset everything and retry from scratch. - self.reset_requests(); - self.continue_requests(cx) } } + self.continue_requests(cx) } - /// Handle data processing result (blobs or custody columns imported). + /// Handle data processing result pub fn on_data_processing_result( &mut self, result: BlockProcessingResult, cx: &mut SyncNetworkContext, ) -> Result { - let Some(DataRequest { - state: DataRequestState::Processing { peer_group }, - .. - }) = &self.data_request - else { - return Err(LookupRequestError::BadState( - "data processing result but not in Processing state".to_owned(), - )); + let DataRequest::Request { state, .. } = &mut self.data_request else { + return Err(LookupRequestError::BadState("no data_request".to_owned())); }; - let peer_group = peer_group.clone(); match result { - BlockProcessingResult::Imported(_) => { - if let Some(req) = &mut self.data_request { - req.state = DataRequestState::Complete; - } - self.continue_requests(cx) + BlockProcessingResult::Imported(_fully_imported, _info) => { + state.on_processing_success()?; } - BlockProcessingResult::Error { penalty, reason } => { - if let Some((action, whom)) = penalty { - whom.apply(action, &peer_group, reason, cx); + BlockProcessingResult::ParentUnknown { .. } => { + return Err(LookupRequestError::BadState( + "data processing returned ParentUnknown".to_owned(), + )); + } + BlockProcessingResult::Error { penalty, .. } => { + let peers = state.on_processing_failure()?; + if let Some((action, whom, msg)) = penalty { + whom.apply(action, &peers, msg, cx); } - // Data processing failed — bump the shared processing-failure counter and rebuild - // the data request so retries stay bounded against MAX_ATTEMPTS. - self.failed_processing = self.failed_processing.saturating_add(1); - self.data_request = None; - self.continue_requests(cx) } } + self.continue_requests(cx) } - /// Handle payload envelope processing result (Gloas only). - pub fn on_payload_processing_result( - &mut self, - result: BlockProcessingResult, - cx: &mut SyncNetworkContext, - ) -> Result { - let Some(PayloadRequest { - state: PayloadRequestState::Processing { peer_group }, - .. - }) = &self.payload_request - else { - return Err(LookupRequestError::BadState( - "payload processing result but not in Processing state".to_owned(), - )); - }; - let peer_group = peer_group.clone(); - - match result { - BlockProcessingResult::Imported(_) => { - if let Some(req) = &mut self.payload_request { - req.state = PayloadRequestState::Complete; - } - self.continue_requests(cx) - } - BlockProcessingResult::Error { penalty, reason } => { - if let Some((action, whom)) = penalty { - whom.apply(action, &peer_group, reason, cx); - } - // Bump the shared processing-failure counter so retries stay bounded against - // MAX_ATTEMPTS, then transition back to Downloading to redownload from another peer. - self.failed_processing = self.failed_processing.saturating_add(1); - if let Some(req) = &mut self.payload_request { - req.state = PayloadRequestState::Downloading { - state: SingleLookupRequestState::new_with_processing_failures( - self.failed_processing, - ), - }; - } - self.continue_requests(cx) - } - } - } - - // -- Download response handlers -- - /// Handle a block download response. Updates download state and advances the lookup. pub fn on_block_download_response( &mut self, @@ -984,12 +524,9 @@ impl SingleBlockLookup { result: BlockDownloadResponse, cx: &mut SyncNetworkContext, ) -> Result { - let BlockRequest::Downloading { state, .. } = &mut self.block_request else { - return Err(LookupRequestError::BadState( - "block response but not downloading".to_owned(), - )); - }; - state.on_download_response(req_id, self.block_root, result)?; + self.block_request + .state + .on_download_response(req_id, result)?; self.continue_requests(cx) } @@ -1000,16 +537,42 @@ impl SingleBlockLookup { result: CustodyDownloadResponse, cx: &mut SyncNetworkContext, ) -> Result { - let Some(DataRequest { - state: DataRequestState::Downloading(DataDownload::Columns { state, .. }), - .. - }) = &mut self.data_request - else { + let DataRequest::Request { state, .. } = &mut self.data_request else { + return Err(LookupRequestError::BadState("no data_request".to_owned())); + }; + + state.on_download_response(req_id, result)?; + self.continue_requests(cx) + } + + /// Handle payload envelope processing result (Gloas only). + pub fn on_payload_processing_result( + &mut self, + result: BlockProcessingResult, + cx: &mut SyncNetworkContext, + ) -> Result { + let PayloadRequest::Request { state, .. } = &mut self.payload_request else { return Err(LookupRequestError::BadState( - "custody response but not downloading columns".to_owned(), + "no payload_request".to_owned(), )); }; - state.on_download_response(req_id, self.block_root, result)?; + + match result { + BlockProcessingResult::Imported(_fully_imported, _info) => { + state.on_processing_success()?; + } + BlockProcessingResult::ParentUnknown { .. } => { + return Err(LookupRequestError::BadState( + "payload processing returned ParentUnknown".to_owned(), + )); + } + BlockProcessingResult::Error { penalty, .. } => { + let peers = state.on_processing_failure()?; + if let Some((action, whom, msg)) = penalty { + whom.apply(action, &peers, msg, cx); + } + } + } self.continue_requests(cx) } @@ -1020,16 +583,13 @@ impl SingleBlockLookup { result: PayloadDownloadResponse, cx: &mut SyncNetworkContext, ) -> Result { - let Some(PayloadRequest { - state: PayloadRequestState::Downloading { state, .. }, - .. - }) = &mut self.payload_request - else { + let PayloadRequest::Request { state, .. } = &mut self.payload_request else { return Err(LookupRequestError::BadState( - "payload envelope response but not downloading payload".to_owned(), + "no payload_request".to_owned(), )); }; - state.on_download_response(req_id, self.block_root, result)?; + + state.on_download_response(req_id, result)?; self.continue_requests(cx) } @@ -1042,12 +602,11 @@ impl SingleBlockLookup { /// Returns true if the peer was newly inserted into any peer set. pub fn add_peer(&mut self, peer_id: PeerId, peer_type: &PeerType) -> bool { let mut added = false; - match peer_type { PeerType::PostGloas(execution_hash) => { - // This peer claims to have imported a child of this block with parent_hash. We - // can't know whether the child is full or empty until we know the payload hash of - // this lookup. + // This peer claims to have imported a FULL child of this block whose bid references + // `execution_hash` as its parent. It is therefore proven to hold this block's + // payload envelope and data columns. added |= self .gloas_child_peers .write() @@ -1058,8 +617,7 @@ impl SingleBlockLookup { } PeerType::PreGloas => {} } - - // Always add to the main block peers + // Always add to the main block peers, they can at least serve the block. added |= self.peers.write().insert(peer_id); added } @@ -1073,121 +631,155 @@ impl SingleBlockLookup { } /// Returns true if this lookup has zero peers - pub fn has_peers(&self) -> bool { - if !self.peers.read().is_empty() { - return true; - } + pub fn has_no_peers(&self) -> bool { + self.peers.read().is_empty() + && self + .gloas_child_peers + .read() + .values() + .all(|set| set.read().is_empty()) + } +} - let gloas_child_peers = self.gloas_child_peers.read(); - !gloas_child_peers.is_empty() - && gloas_child_peers.values().any(|set| !set.read().is_empty()) +#[derive(Debug, Clone)] +pub struct DownloadResult { + pub value: T, + pub seen_timestamp: Duration, + pub peer_group: PeerGroup, +} + +impl DownloadResult { + pub fn new(value: T, peer_group: PeerGroup, seen_timestamp: Duration) -> Self { + Self { + value, + seen_timestamp, + peer_group, + } } } #[derive(IntoStaticStr)] -enum DownloadState { +pub enum State { AwaitingDownload(/* reason */ &'static str), Downloading(ReqId), - Downloaded(DownloadResult), - /// Download completed with no request needed (e.g. all components already imported) - Completed(/* reason */ &'static str), + AwaitingProcess(DownloadResult), + /// Request is processing, sent by lookup sync + Processing(DownloadResult), + /// Request is processed + Processed(/* reason */ &'static str, T), } /// Object representing the state of a single block or blob lookup request. #[derive(Debug)] -struct SingleLookupRequestState { - state: DownloadState, +pub struct SingleLookupRequestState { + /// State of this request. + state: State, + /// How many times have we attempted to process this block or blob. failed_processing: u8, + /// How many times have we attempted to download this block or blob. failed_downloading: u8, } impl SingleLookupRequestState { - fn new() -> Self { + pub fn new() -> Self { Self { - state: DownloadState::AwaitingDownload("not started"), + state: State::AwaitingDownload("not started"), failed_processing: 0, failed_downloading: 0, } } - fn new_with_processing_failures(failed_processing: u8) -> Self { - Self { - state: DownloadState::AwaitingDownload("reset after processing failure"), - failed_processing, - failed_downloading: 0, + pub fn is_awaiting_download(&self) -> bool { + match self.state { + State::AwaitingDownload { .. } => true, + State::Downloading { .. } + | State::AwaitingProcess { .. } + | State::Processing { .. } + | State::Processed { .. } => false, } } - fn is_awaiting_download(&self) -> bool { - matches!(self.state, DownloadState::AwaitingDownload { .. }) + pub fn is_processed(&self) -> bool { + match self.state { + State::AwaitingDownload { .. } + | State::Downloading { .. } + | State::AwaitingProcess { .. } + | State::Processing { .. } => false, + State::Processed { .. } => true, + } } - fn is_completed(&self) -> bool { - matches!(self.state, DownloadState::Completed { .. }) + /// Returns true if we can expect some future event to progress this block component request + /// specifically. + pub fn is_awaiting_event(&self) -> bool { + match self.state { + // No event will progress this request specifically, but the request may be put on hold + // due to some external event + State::AwaitingDownload { .. } => false, + // Network will emit a download success / error event + State::Downloading { .. } => true, + // Not awaiting any external event + State::AwaitingProcess { .. } => false, + // Beacon processor will emit a processing result event + State::Processing { .. } => true, + // Request complete, no future event left + State::Processed { .. } => false, + } + } + + pub fn peek_downloaded_data(&self) -> Option<&T> { + match &self.state { + State::AwaitingDownload { .. } => None, + State::Downloading { .. } => None, + State::AwaitingProcess(result) => Some(&result.value), + State::Processing(result) => Some(&result.value), + State::Processed(_, value) => Some(value), + } } /// Drive download: check max attempts, issue request, handle result. - fn make_request( + fn maybe_start_downloading( &mut self, - request_fn: impl FnOnce() -> Result, + request_fn: impl FnOnce() -> Result, RpcRequestSendError>, ) -> Result<(), LookupRequestError> { - if !self.is_awaiting_download() { - return Ok(()); - } - if self.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS { - let cannot_process = self.more_failed_processing_attempts(); - return Err(LookupRequestError::TooManyAttempts { cannot_process }); - } - match request_fn().map_err(LookupRequestError::SendFailedNetwork)? { - LookupRequestResult::RequestSent(req_id) => self.on_download_start(req_id)?, - LookupRequestResult::NoRequestNeeded(reason) => self.on_completed_request(reason)?, - LookupRequestResult::Pending(reason) => self.update_awaiting_download_status(reason), + if self.is_awaiting_download() { + match request_fn().map_err(LookupRequestError::SendFailedNetwork)? { + LookupRequestResult::RequestSent(req_id) => self.on_download_start(req_id)?, + LookupRequestResult::NoRequestNeeded(reason, value) => { + self.on_completed_request(reason, value)? + } + LookupRequestResult::Pending(reason) => { + self.update_awaiting_download_status(reason) + } + } } Ok(()) } - fn is_awaiting_event(&self) -> bool { - matches!(self.state, DownloadState::Downloading { .. }) - } - - fn peek_downloaded_data(&self) -> Option<&T> { - match &self.state { - DownloadState::Downloaded(data) => Some(&data.value), - _ => None, - } - } - - /// Take the download result out, transitioning back to AwaitingDownload. - /// Returns None if not in Downloaded state. - fn take_download_result(&mut self) -> Option> { - let old = std::mem::replace(&mut self.state, DownloadState::AwaitingDownload("taken")); - if let DownloadState::Downloaded(result) = old { - Some(result) - } else { - self.state = old; - None - } - } - - fn insert_verified_response(&mut self, result: DownloadResult) -> bool { - if let DownloadState::AwaitingDownload { .. } = &self.state { - self.state = DownloadState::Downloaded(result); + /// Switch to `AwaitingProcessing` if the request is in `AwaitingDownload` state, otherwise + /// ignore. + pub fn insert_verified_response(&mut self, result: DownloadResult) -> bool { + if let State::AwaitingDownload { .. } = &self.state { + self.state = State::AwaitingProcess(result); true } else { false } } - fn update_awaiting_download_status(&mut self, new_status: &'static str) { - if let DownloadState::AwaitingDownload(status) = &mut self.state { - *status = new_status; + /// Append metadata on why this request is in AwaitingDownload status. Very helpful to debug + /// stuck lookups. Not fallible as it's purely informational. + pub fn update_awaiting_download_status(&mut self, new_status: &'static str) { + if let State::AwaitingDownload(status) = &mut self.state { + *status = new_status } } - fn on_download_start(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { + /// Switch to `Downloading` if the request is in `AwaitingDownload` state, otherwise returns None. + pub fn on_download_start(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { match &self.state { - DownloadState::AwaitingDownload { .. } => { - self.state = DownloadState::Downloading(req_id); + State::AwaitingDownload { .. } => { + self.state = State::Downloading(req_id); Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -1196,30 +788,22 @@ impl SingleLookupRequestState { } } - /// Handle a download response: dispatch success or failure based on result. - fn on_download_response( + pub fn on_download_response( &mut self, req_id: ReqId, - block_root: Hash256, - result: Result<(T, PeerGroup, Duration), RpcResponseError>, + result: Result, RpcResponseError>, ) -> Result<(), LookupRequestError> { match result { - Ok((value, peer_group, seen_timestamp)) => self.on_download_success( - req_id, - DownloadResult { - value, - block_root, - seen_timestamp, - peer_group, - }, - ), + Ok(result) => self.on_download_success(req_id, result), Err(_) => self.on_download_failure(req_id), } } - fn on_download_failure(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { + /// Registers a failure in downloading a block. This might be a peer disconnection or a wrong + /// block. + pub fn on_download_failure(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { match &self.state { - DownloadState::Downloading(expected_req_id) => { + State::Downloading(expected_req_id) => { if req_id != *expected_req_id { return Err(LookupRequestError::UnexpectedRequestId { expected_req_id: *expected_req_id, @@ -1227,7 +811,11 @@ impl SingleLookupRequestState { }); } self.failed_downloading = self.failed_downloading.saturating_add(1); - self.state = DownloadState::AwaitingDownload("not started"); + if self.failed_downloading >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS { + return Err(LookupRequestError::TooManyAttempts); + } + + self.state = State::AwaitingDownload("not started"); Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -1236,20 +824,20 @@ impl SingleLookupRequestState { } } - fn on_download_success( + pub fn on_download_success( &mut self, req_id: ReqId, result: DownloadResult, ) -> Result<(), LookupRequestError> { match &self.state { - DownloadState::Downloading(expected_req_id) => { + State::Downloading(expected_req_id) => { if req_id != *expected_req_id { return Err(LookupRequestError::UnexpectedRequestId { expected_req_id: *expected_req_id, req_id, }); } - self.state = DownloadState::Downloaded(result); + self.state = State::AwaitingProcess(result); Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -1258,10 +846,72 @@ impl SingleLookupRequestState { } } - fn on_completed_request(&mut self, reason: &'static str) -> Result<(), LookupRequestError> { + /// Switch to `Processing` if the request is in `AwaitingProcess` state, otherwise returns None. + pub fn maybe_start_processing(&mut self) -> Option> { + // For 2 lines replace state with placeholder to gain ownership of `result` match &self.state { - DownloadState::AwaitingDownload { .. } => { - self.state = DownloadState::Completed(reason); + State::AwaitingProcess(result) => { + let result = result.clone(); + self.state = State::Processing(result.clone()); + Some(result) + } + _ => None, + } + } + + /// Revert into `AwaitingProcessing`, if the payload if not invalid and can be submitted for + /// processing latter. + pub fn revert_to_awaiting_processing(&mut self) -> Result<(), LookupRequestError> { + match &self.state { + State::Processing(result) => { + self.state = State::AwaitingProcess(result.clone()); + Ok(()) + } + other => Err(LookupRequestError::BadState(format!( + "Bad state on revert_to_awaiting_processing expected Processing got {other}" + ))), + } + } + + /// Registers a failure in processing a block. + pub fn on_processing_failure(&mut self) -> Result { + match &self.state { + State::Processing(result) => { + let peers_source = result.peer_group.clone(); + self.failed_processing = self.failed_processing.saturating_add(1); + if self.failed_processing >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS { + return Err(LookupRequestError::TooManyAttempts); + } + self.state = State::AwaitingDownload("not started"); + Ok(peers_source) + } + other => Err(LookupRequestError::BadState(format!( + "Bad state on_processing_failure expected Processing got {other}" + ))), + } + } + + pub fn on_processing_success(&mut self) -> Result<(), LookupRequestError> { + match &self.state { + State::Processing(data) => { + self.state = State::Processed("processing success", data.value.clone()); + Ok(()) + } + other => Err(LookupRequestError::BadState(format!( + "Bad state on_processing_success expected Processing got {other}" + ))), + } + } + + /// Mark a request as complete without any download or processing + pub fn on_completed_request( + &mut self, + reason: &'static str, + value: T, + ) -> Result<(), LookupRequestError> { + match &self.state { + State::AwaitingDownload { .. } => { + self.state = State::Processed(reason, value); Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -1269,38 +919,25 @@ impl SingleLookupRequestState { ))), } } - - fn failed_attempts(&self) -> u8 { - self.failed_processing + self.failed_downloading - } - - fn more_failed_processing_attempts(&self) -> bool { - self.failed_processing >= self.failed_downloading - } } -impl std::fmt::Display for DownloadState { +// Display is used in the BadState assertions above +impl std::fmt::Display for State { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", Into::<&'static str>::into(self)) } } -impl std::fmt::Debug for DownloadState { +// Debug is used in the log_stuck_lookups print to include some more info. Implements custom Debug +// to not dump an entire block or blob to terminal which don't add valuable data. +impl std::fmt::Debug for State { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::AwaitingDownload(reason) => write!(f, "AwaitingDownload({})", reason), Self::Downloading(req_id) => write!(f, "Downloading({:?})", req_id), - Self::Downloaded(_) => write!(f, "Downloaded()"), - Self::Completed(reason) => write!(f, "Completed({})", reason), - } - } -} - -impl std::fmt::Display for AwaitingParent { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.gloas_bid_parent_hash { - Some(parent_hash) => write!(f, "{}/{}", self.parent_root, parent_hash), - None => write!(f, "{}", self.parent_root), + Self::AwaitingProcess(_) => write!(f, "AwaitingProcess"), + Self::Processing(_) => write!(f, "Processing"), + Self::Processed(reason, _) => write!(f, "Processed({})", reason), } } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index c7b6bd5c8c..04c8980bd6 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -34,16 +34,18 @@ //! search for the block and subsequently search for parents if needed. use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; -use super::block_lookups::{BlockLookups, NewLookupTrigger}; +use super::block_lookups::BlockLookups; use super::network_context::{ CustodyByRootResult, RangeBlockComponent, RangeRequestId, RpcEvent, SyncNetworkContext, }; use super::peer_sync_info::{PeerSyncType, remote_sync_type}; use super::range_sync::{EPOCHS_PER_BATCH, RangeSync, RangeSyncType}; -use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; +use crate::network_beacon_processor::{ + BlockProcessingResult, ChainSegmentProcessId, NetworkBeaconProcessor, +}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; -use crate::sync::block_lookups::{AwaitingParent, BlockComponent, DownloadResult}; +use crate::sync::block_lookups::{BlockComponent, DownloadResult}; use crate::sync::custody_backfill_sync::CustodyBackFillSync; use crate::sync::network_context::{PeerGroup, RpcResponseResult}; use beacon_chain::block_verification_types::AsBlock; @@ -69,8 +71,8 @@ use strum::IntoStaticStr; use tokio::sync::mpsc; use tracing::{debug, error, info, trace}; use types::{ - BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, - SignedExecutionPayloadEnvelope, Slot, + BlobSidecar, DataColumnSidecar, EthSpec, ExecutionBlockHash, ForkContext, Hash256, + SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -140,7 +142,8 @@ pub enum SyncMessage { /// A block with an unknown parent has been received. UnknownParentBlock(PeerId, Arc>, Hash256), - /// A sidecar with an unknown parent has been received. + /// A sidecar (full/partial data column) with an unknown parent has been received. Carries only the header + /// info needed to trigger a parent lookup, decoupled from the concrete sidecar type. UnknownParentSidecarHeader { peer_id: PeerId, block_root: Hash256, @@ -204,54 +207,6 @@ impl BlockProcessType { } } -/// The classified outcome of submitting a block / blob / column for processing. The producer -/// (`network_beacon_processor`) translates the raw beacon-chain `Result<_, BlockError>` into this -/// shape so the lookup state machine only has to resolve "which peer to penalize" symbolically. -#[derive(Debug)] -pub enum BlockProcessingResult { - /// Data was imported (or already present, or otherwise satisfies the lookup). `info` is a - /// short stable identifier suitable for debug logs / metrics. - Imported(&'static str), - /// Processing failed. `penalty` is `Some` when an attributable peer should be downscored. - Error { - penalty: Option<(PeerAction, WhichPeerToPenalize)>, - reason: &'static str, - }, -} - -/// Symbolic identifier for the peer(s) the lookup should resolve and downscore. The consumer -/// passes in the relevant `PeerGroup` (a singleton for block processing, the in-flight data peer -/// group for data processing) and `apply` selects from it. -#[derive(Debug, Clone, Copy)] -pub enum WhichPeerToPenalize { - /// All peers in the passed `PeerGroup` (typically a singleton constructed from the block peer - /// or the blob peer — i.e. the peer responsible for the component as a whole). - BlockPeer, - /// The custody peer(s) that served a specific column index in the passed `PeerGroup`. - CustodyPeerForColumn(u64), -} - -impl WhichPeerToPenalize { - /// Resolve this symbolic identifier against `peer_group` and downscore the matching peer(s). - pub fn apply( - self, - action: PeerAction, - peer_group: &crate::sync::network_context::PeerGroup, - reason: &'static str, - cx: &mut crate::sync::network_context::SyncNetworkContext, - ) { - let peers: Vec = match self { - WhichPeerToPenalize::BlockPeer => peer_group.all().copied().collect(), - WhichPeerToPenalize::CustodyPeerForColumn(idx) => { - peer_group.of_index(idx as usize).copied().collect() - } - }; - for peer in peers { - cx.report_peer(peer, action, reason); - } - } -} - /// The result of processing multiple blocks (a chain segment). #[derive(Debug)] pub enum BatchProcessResult { @@ -902,15 +857,18 @@ impl SyncManager { SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); + // Post-Gloas: the child's bid `parent_block_hash` lets the parent lookup partition + // peers and know it's FULL. + let parent_block_hash = block.payload_bid_parent_block_hash().ok(); debug!(%block_root, %parent_root, "Received unknown parent block message"); self.handle_unknown_parent( peer_id, block_root, + parent_root, + parent_block_hash, block_slot, - AwaitingParent::from_block(&block), BlockComponent::Block(DownloadResult { value: block.block_cloned(), - block_root, seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(), peer_group: PeerGroup::from_single(peer_id), }), @@ -922,28 +880,17 @@ impl SyncManager { parent_root, slot, } => { - debug!(%block_root, %parent_root, "Received unknown parent sidecar message"); - match AwaitingParent::from_block_header::( + debug!(%block_root, %parent_root, "Received unknown parent sidecar header message"); + self.handle_unknown_parent( + peer_id, + block_root, parent_root, + // No block downloaded yet, so the bid hash is unknown. The correct peer set is + // established once the child's block downloads. + None, slot, - self.spec(), - ) { - Ok(awaiting_parent) => { - self.handle_unknown_parent( - peer_id, - block_root, - slot, - awaiting_parent, - BlockComponent::Sidecar, - ); - } - Err(e) => { - tracing::warn!( - ?e, - "Sent UnknownParentSidecarHeader with post-Gloas sidecar" - ); - } - } + BlockComponent::Sidecar, + ); } SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_root) => { if !self.notified_unknown_roots.contains(&(peer_id, block_root)) { @@ -1023,8 +970,9 @@ impl SyncManager { &mut self, peer_id: PeerId, block_root: Hash256, + parent_root: Hash256, + parent_block_hash: Option, slot: Slot, - awaiting_parent: AwaitingParent, block_component: BlockComponent, ) { match self.should_search_for_block(Some(slot), &peer_id) { @@ -1032,27 +980,22 @@ impl SyncManager { if self.block_lookups.search_child_and_parent( block_root, block_component, - awaiting_parent, + parent_root, + parent_block_hash, peer_id, - NewLookupTrigger::NetworkMessage, &mut self.network, ) { // Lookup created. No need to log here it's logged in `new_current_lookup` } else { debug!( ?block_root, - %awaiting_parent, + ?parent_root, "No lookup created for child and parent" ); } } Err(reason) => { - debug!( - %block_root, - %awaiting_parent, - reason, - "Ignoring unknown parent request" - ); + debug!(%block_root, %parent_root, reason, "Ignoring unknown parent request"); } } } @@ -1063,7 +1006,6 @@ impl SyncManager { if self.block_lookups.search_unknown_block( block_root, &[peer_id], - NewLookupTrigger::NetworkMessage, &mut self.network, ) { // Lookup created. No need to log here it's logged in `new_current_lookup` @@ -1193,7 +1135,7 @@ impl SyncManager { self.block_lookups.on_block_download_response( id, resp.map(|(value, seen_timestamp)| { - (value, PeerGroup::from_single(peer_id), seen_timestamp) + DownloadResult::new(value, PeerGroup::from_single(peer_id), seen_timestamp) }), &mut self.network, ) @@ -1219,6 +1161,26 @@ impl SyncManager { } } + fn rpc_payload_envelope_received( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + seen_timestamp: Duration, + ) { + match sync_request_id { + SyncRequestId::SinglePayloadEnvelope { id } => self + .on_single_payload_envelope_response( + id, + peer_id, + RpcEvent::from_chunk(envelope, seen_timestamp), + ), + _ => { + crit!(%peer_id, "bad request id for payload envelope"); + } + } + } + fn rpc_data_column_received( &mut self, sync_request_id: SyncRequestId, @@ -1247,26 +1209,6 @@ impl SyncManager { } } - fn rpc_payload_envelope_received( - &mut self, - sync_request_id: SyncRequestId, - peer_id: PeerId, - envelope: Option>>, - seen_timestamp: Duration, - ) { - match sync_request_id { - SyncRequestId::SinglePayloadEnvelope { id } => self - .on_single_payload_envelope_response( - id, - peer_id, - RpcEvent::from_chunk(envelope, seen_timestamp), - ), - _ => { - crit!(%peer_id, "bad request id for payload_envelope"); - } - } - } - fn on_single_payload_envelope_response( &mut self, id: SingleLookupReqId, @@ -1280,7 +1222,7 @@ impl SyncManager { self.block_lookups.on_payload_download_response( id, resp.map(|(value, seen_timestamp)| { - (value, PeerGroup::from_single(peer_id), seen_timestamp) + DownloadResult::new(value, PeerGroup::from_single(peer_id), seen_timestamp) }), &mut self.network, ) @@ -1481,8 +1423,4 @@ impl SyncManager { } } } - - fn spec(&self) -> &ChainSpec { - &self.network_globals().spec - } } diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 054bab654c..f121c1f1b7 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -15,4 +15,5 @@ mod range_sync; mod tests; pub use manager::{BatchProcessResult, SyncMessage}; +pub use network_context::{PeerGroup, SyncNetworkContext}; pub use range_sync::ChainId; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 5e8e68f277..6b7de27dba 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -16,7 +16,7 @@ use crate::network_beacon_processor::TestBeaconChainType; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::batch::ByRangeRequestType; -use crate::sync::block_lookups::SingleLookupId; +use crate::sync::block_lookups::{DownloadResult, SingleLookupId}; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::range_data_column_batch_request::RangeDataColumnBatchRequest; use beacon_chain::block_verification_types::LookupBlock; @@ -96,7 +96,7 @@ pub type RpcResponseResult = Result<(T, Duration), RpcResponseError>; /// Duration = latest seen timestamp of all received data columns pub type CustodyByRootResult = - Result<(DataColumnSidecarList, PeerGroup, Duration), RpcResponseError>; + Result>, RpcResponseError>; #[derive(Debug)] #[allow(private_interfaces)] @@ -178,13 +178,13 @@ impl PeerGroup { /// Sequential ID that uniquely identifies ReqResp outgoing requests pub type ReqId = u32; -pub enum LookupRequestResult { +pub enum LookupRequestResult { /// A request is sent. Sync MUST receive an event from the network in the future for either: /// completed response or failed request RequestSent(I), /// No request is sent, and no further action is necessary to consider this request completed. /// Includes a reason why this request is not needed. - NoRequestNeeded(&'static str), + NoRequestNeeded(&'static str, T), /// No request is sent, but the request is not completed. Sync MUST receive some future event /// that makes progress on the request. For example: request is processing from a different /// source (i.e. block received from gossip) and sync MUST receive an event with that processing @@ -826,7 +826,7 @@ impl SyncNetworkContext { lookup_id: SingleLookupId, lookup_peers: Arc>>, block_root: Hash256, - ) -> Result { + ) -> Result>>, RpcRequestSendError> { let active_request_count_by_peer = self.active_request_count_by_peer(); let Some(peer_id) = lookup_peers .read() @@ -877,9 +877,10 @@ impl SyncNetworkContext { }, // Block is fully validated. If it's not yet imported it's waiting for missing block // components. Consider this request completed and do nothing. - BlockProcessStatus::ExecutionValidated { .. } => { + BlockProcessStatus::ExecutionValidated(block) => { return Ok(LookupRequestResult::NoRequestNeeded( "block execution validated", + block, )); } } @@ -942,12 +943,18 @@ impl SyncNetworkContext { lookup_id: SingleLookupId, lookup_peers: Arc>>, block_root: Hash256, - ) -> Result { + ) -> Result< + LookupRequestResult>>, + RpcRequestSendError, + > { // Skip the download if fork-choice already saw this envelope (e.g. imported via gossip - // before the lookup got here). - if self.chain.envelope_is_known_to_fork_choice(&block_root) { + // before the lookup got here). Return the cached envelope so the request completes. + if self.chain.envelope_is_known_to_fork_choice(&block_root) + && let Ok(Some(envelope)) = self.chain.get_payload_envelope(&block_root) + { return Ok(LookupRequestResult::NoRequestNeeded( "envelope already known to fork-choice", + Arc::new(envelope), )); } @@ -1016,7 +1023,7 @@ impl SyncNetworkContext { peer_id: PeerId, request: DataColumnsByRootSingleBlockRequest, expect_max_responses: bool, - ) -> Result, &'static str> { + ) -> Result, &'static str> { let id = DataColumnsByRootRequestId { id: self.next_id(), requester, @@ -1065,7 +1072,7 @@ impl SyncNetworkContext { block_root: Hash256, block_slot: Slot, lookup_peers: Arc>>, - ) -> Result { + ) -> Result>, RpcRequestSendError> { let custody_indexes_imported = self .chain .cached_data_column_indexes(&block_root, block_slot) @@ -1083,7 +1090,10 @@ impl SyncNetworkContext { if custody_indexes_to_fetch.is_empty() { // No indexes required, do not issue any request - return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch")); + return Ok(LookupRequestResult::NoRequestNeeded( + "no indices to fetch", + vec![], + )); } let id = SingleLookupReqId { @@ -1533,8 +1543,8 @@ impl SyncNetworkContext { // Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to // an Option first to use in an `if let Some() { act on result }` block. match result.as_ref() { - Some(Ok((columns, peer_group, _))) => { - debug!(?id, count = columns.len(), peers = ?peer_group, "Custody request success, removing") + Some(Ok(data)) => { + debug!(?id, count = data.value.len(), peers = ?data.peer_group, "Custody request success, removing") } Some(Err(e)) => { debug!(?id, error = ?e, "Custody request failure, removing" ) diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index 2b96800e37..e74b74ec08 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -1,3 +1,4 @@ +use crate::sync::block_lookups::DownloadResult; use crate::sync::network_context::{ DataColumnsByRootRequestId, DataColumnsByRootSingleBlockRequest, }; @@ -56,8 +57,7 @@ struct ActiveBatchColumnsRequest { span: Span, } -pub type CustodyRequestResult = - Result, PeerGroup, Duration)>, Error>; +pub type CustodyRequestResult = Result>>, Error>; impl ActiveCustodyRequest { pub(crate) fn new( @@ -227,7 +227,11 @@ impl ActiveCustodyRequest { .into_iter() .max() .unwrap_or_else(|| cx.chain.slot_clock.now_duration().unwrap_or_default()); - return Ok(Some((columns, peer_group, max_seen_timestamp))); + return Ok(Some(DownloadResult::new( + columns, + peer_group, + max_seen_timestamp, + ))); } let active_request_count_by_peer = cx.active_request_count_by_peer(); @@ -343,7 +347,7 @@ impl ActiveCustodyRequest { }, ); } - LookupRequestResult::NoRequestNeeded(_) => unreachable!(), + LookupRequestResult::NoRequestNeeded(..) => unreachable!(), LookupRequestResult::Pending(_) => unreachable!(), } } diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index e6b81b8971..f1b65ce8ff 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1,12 +1,14 @@ use super::*; use crate::NetworkMessage; +use crate::network_beacon_processor::BlockProcessingResult; +use crate::network_beacon_processor::sync_methods::WhichPeerToPenalize; use crate::network_beacon_processor::{ ChainSegmentProcessId, InvalidBlockStorage, NetworkBeaconProcessor, }; use crate::sync::block_lookups::{BlockLookupSummary, PARENT_DEPTH_TOLERANCE}; use crate::sync::{ SyncMessage, - manager::{BatchProcessResult, BlockProcessType, BlockProcessingResult, SyncManager}, + manager::{BatchProcessResult, BlockProcessType, SyncManager}, }; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::custody_context::NodeCustodyType; @@ -963,7 +965,6 @@ impl TestRig { // Add genesis block for completeness let genesis_block = external_harness.get_head_block(); - let genesis_block_root = genesis_block.canonical_root(); self.network_blocks_by_root .insert(genesis_block.canonical_root(), genesis_block.clone()); self.network_blocks_by_slot @@ -998,7 +999,6 @@ impl TestRig { } // Re-log to have a nice list of block roots at the end - self.log(&format!("Build chain (Slot(0), {genesis_block_root})")); for block in &blocks { self.log(&format!("Build chain {block:?}")); } @@ -1034,17 +1034,10 @@ impl TestRig { .data_columns() .expect("no columns"); let first = columns.first_mut().expect("empty columns"); - match Arc::make_mut(first) { - DataColumnSidecar::Fulu(col) => { - col.signed_block_header.signature = self.valid_signature(); - } - DataColumnSidecar::Gloas(_) => { - // Gloas columns don't carry a per-column proposer signature; the proposer - // signature lives in the block's bid. Leave the column unmodified — under - // `fake_crypto` the test still asserts a successful lookup with no penalty, - // which is the natural outcome when nothing is corrupted. - } - } + Arc::make_mut(first) + .signed_block_header_mut() + .expect("not fulu") + .signature = self.valid_signature(); self.re_insert_block(block, blobs, Some(columns)); } @@ -1397,10 +1390,6 @@ impl TestRig { // Test setup - fn new_after_deneb() -> Option { - genesis_fork().deneb_enabled().then(Self::default) - } - fn new_after_fulu() -> Option { genesis_fork().fulu_enabled().then(Self::default) } @@ -1427,10 +1416,6 @@ impl TestRig { info!(msg, "TEST_RIG"); } - pub fn is_after_deneb(&self) -> bool { - self.fork_name.deneb_enabled() - } - pub fn is_after_fulu(&self) -> bool { self.fork_name.fulu_enabled() } @@ -1897,18 +1882,14 @@ async fn happy_path_unknown_block_parent(depth: usize) { r.build_chain(depth).await; r.trigger_with_last_unknown_block_parent(); r.simulate(SimulateConfig::happy_path()).await; - // All lookups should NOT complete on this test, however note the following for the tip lookup, - // it's the lookup for the tip block which has 0 peers and a block cached: + // Note the following for the tip lookup, it's the lookup for the tip block which has 0 peers + // and a block cached: // - before deneb the block is cached, so it's sent for processing, and success - // - before fulu the block is cached, but we can't fetch blobs so it's stuck + // - deneb/electra the block is cached, so it's sent for processing, and success // - after fulu the block is cached, we start a custody request and since we use the global pool // of peers we DO have 1 connected synced supernode peer, which gives us the columns and the // lookup succeeds - if r.is_after_deneb() && !r.is_after_fulu() { - r.assert_successful_lookup_sync_parent_trigger() - } else { - r.assert_successful_lookup_sync(); - } + r.assert_successful_lookup_sync(); } /// Assert that sync completes from an UnknownDataColumnParent @@ -1941,7 +1922,7 @@ async fn happy_path_multiple_triggers(depth: usize) { if r.is_after_gloas() { // Gloas data columns reference their own block, not a parent, so there is no // unknown-parent-from-data trigger. The block triggers above already exercise dedup. - } else if r.is_after_fulu() { + } else { r.trigger_with_last_unknown_data_column_parent(); } r.simulate(SimulateConfig::happy_path()).await; @@ -1966,37 +1947,41 @@ async fn bad_peer_empty_block_response(depth: usize) { // TODO(tree-sync) Assert that a single lookup is created (no drops) } -/// Assert that if peer responds with no blobs / columns, we downscore, and retry the same lookup +/// Assert that if peer responds with no columns, we downscore, and retry the same lookup. async fn bad_peer_empty_data_response(depth: usize) { - let Some(mut r) = TestRig::new_after_deneb() else { + let Some(mut r) = TestRig::new_after_fulu() else { return; }; r.build_chain_and_trigger_last_block(depth).await; r.simulate(SimulateConfig::new().return_no_data_once()) .await; // We register a penalty, retry and complete sync successfully - if !(r.is_after_gloas() && depth == 1) { - // TODO(gloas): This test on gloas 1 depth has an empty peer set so we can't attribute fault to - // any peers and no-one is penalized + if !r.is_after_gloas() { + // TODO(gloas): the tip lookup's columns are only attributable to peers that imported a FULL + // child of the tip. The tip has no child here, so its column peer set is empty and the + // withholding peer can't be penalized. This holds at every depth, since the trigger always + // targets the tip. r.assert_penalties(&["NotEnoughResponsesReturned"]); } r.assert_successful_lookup_sync(); // TODO(tree-sync) Assert that a single lookup is created (no drops) } -/// Assert that if peer responds with not enough blobs / columns, we downscore, and retry the same -/// lookup +/// Assert that if peer responds with not enough columns, we downscore, and retry the same +/// lookup. async fn bad_peer_too_few_data_response(depth: usize) { - let Some(mut r) = TestRig::new_after_deneb() else { + let Some(mut r) = TestRig::new_after_fulu() else { return; }; r.build_chain_and_trigger_last_block(depth).await; r.simulate(SimulateConfig::new().return_too_few_data_once()) .await; // We register a penalty, retry and complete sync successfully - if !(r.is_after_gloas() && depth == 1) { - // TODO(gloas): This test on gloas 1 depth has an empty peer set so we can't attribute fault to - // any peers and no-one is penalized + if !r.is_after_gloas() { + // TODO(gloas): the tip lookup's columns are only attributable to peers that imported a FULL + // child of the tip. The tip has no child here, so its column peer set is empty and the + // withholding peer can't be penalized. This holds at every depth, since the trigger always + // targets the tip. r.assert_penalties(&["NotEnoughResponsesReturned"]); } r.assert_successful_lookup_sync(); @@ -2015,9 +2000,9 @@ async fn bad_peer_wrong_block_response(depth: usize) { // TODO(tree-sync) Assert that a single lookup is created (no drops) } -/// Assert that if peer responds with bad blobs / columns, we downscore, and retry the same lookup +/// Assert that if peer responds with bad columns, we downscore, and retry the same lookup. async fn bad_peer_wrong_data_response(depth: usize) { - let Some(mut r) = TestRig::new_after_deneb() else { + let Some(mut r) = TestRig::new_after_fulu() else { return; }; r.build_chain_and_trigger_last_block(depth).await; @@ -2068,10 +2053,11 @@ async fn too_many_processing_failures(depth: usize) { r.simulate( SimulateConfig::new().with_process_result(|| BlockProcessingResult::Error { penalty: Some(( - lighthouse_network::PeerAction::MidToleranceError, - crate::sync::manager::WhichPeerToPenalize::BlockPeer, + PeerAction::MidToleranceError, + WhichPeerToPenalize::BlockPeer, + "lookup_block_processing_failure", )), - reason: "lookup_block_processing_failure", + reason: "lookup_block_processing_failure".to_string(), }), ) .await; @@ -2123,22 +2109,21 @@ async fn unknown_parent_does_not_add_peers_to_itself() { } #[tokio::test] -/// Assert that if the beacon processor returns a processor-overloaded error, the lookup retries -/// without penalizing peers and eventually fails after MAX_ATTEMPTS. +/// Assert that a non-attributable processing error (e.g. processor overloaded) is retried up to +/// `SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS`, no peer is penalized, and the lookup is then dropped. async fn test_single_block_lookup_ignored_response() { let mut r = TestRig::default(); r.build_chain_and_trigger_last_block(1).await; - // Send a "processor overloaded" response repeatedly. Under the new model this is just an - // Error with no peer penalty; the lookup retries until MAX_ATTEMPTS, then drops. r.simulate( SimulateConfig::new().with_process_result(|| BlockProcessingResult::Error { penalty: None, - reason: "processor_overloaded", + reason: "processor_overloaded".to_string(), }), ) .await; // The block was not actually imported r.assert_head_slot(0); + r.assert_no_penalties(); assert_eq!(r.created_lookups(), 1, "no created lookups"); assert_eq!(r.dropped_lookups(), 1, "no dropped lookups"); assert_eq!(r.completed_lookups(), 0, "some completed lookups"); @@ -2161,7 +2146,8 @@ async fn test_single_block_lookup_duplicate_response() { r.build_chain_and_trigger_last_block(1).await; // Send a DuplicateFullyImported response, the lookup should complete successfully r.simulate( - SimulateConfig::new().with_process_result(|| BlockProcessingResult::Imported("duplicate")), + SimulateConfig::new() + .with_process_result(|| BlockProcessingResult::Imported(true, "duplicate")), ) .await; // The block was not actually imported @@ -2373,10 +2359,16 @@ async fn test_same_chain_race_condition() { #[tokio::test] /// Assert that if the lookup's block is in the da_checker we don't download it again async fn block_in_da_checker_skips_download() { - // Only in Deneb, as the block needs blobs to remain in the da_checker - let Some(mut r) = TestRig::new_after_deneb_before_fulu() else { + // Only post-Fulu, as the block needs custody columns to remain in the da_checker + let Some(mut r) = TestRig::new_after_fulu() else { return; }; + // TODO(gloas): a gloas block also needs its payload envelope to remain in the da_checker as + // missing-components; the harness helper only inserts the block + columns, so the gloas block + // never registers as missing-components. Skip until the helper donates an envelope. + if r.is_after_gloas() { + return; + } // Add block to da_checker // Complete test with happy path // Assert that there were no requests for blocks @@ -2438,31 +2430,6 @@ async fn block_in_processing_cache_becomes_valid_imported() { r.assert_no_active_lookups(); } -/// Test that lookups complete when the block is already fully imported. -/// Exercises the `NoRequestNeeded` → `Completed` download state path. -/// Without the fix, `on_completed_request` left the state as `AwaitingDownload` -/// causing an infinite re-check loop. -#[tokio::test] -async fn lookup_completes_when_block_already_imported() { - let mut r = TestRig::default(); - r.build_chain(1).await; - - // Fully import block 1 (this also imports its blobs/columns if any) - let block_root = r.block_root_at_slot(1); - r.import_block_by_root(block_root).await; - - // Now trigger a lookup for the SAME block via attestation. - // block_lookup_request → ExecutionValidated → NoRequestNeeded - // Without the Completed state fix, the lookup would hang. - r.trigger_with_block_at_slot(1); - assert!( - r.created_lookups() > 0, - "lookup must be created for this test to be valid" - ); - r.simulate(SimulateConfig::happy_path()).await; - r.assert_successful_lookup_sync(); -} - macro_rules! fulu_peer_matrix_tests { ( [$($name:ident => $variant:expr),+ $(,)?] @@ -2579,7 +2546,7 @@ async fn crypto_on_fail_with_invalid_block_signature() { r.assert_no_penalties(); } else { r.assert_failed_lookup_sync(); - r.assert_penalties_of_type("lookup_block_processing_failure"); + r.assert_penalties_of_type("InvalidSignature"); } } @@ -2602,7 +2569,7 @@ async fn crypto_on_fail_with_bad_column_proposer_signature() { r.assert_no_penalties(); } else { r.assert_failed_lookup_sync(); - r.assert_penalties_of_type("lookup_custody_column_processing_failure"); + r.assert_penalties_of_type("InvalidSignature"); } } @@ -2620,6 +2587,6 @@ async fn crypto_on_fail_with_bad_column_kzg_proof() { r.assert_no_penalties(); } else { r.assert_failed_lookup_sync(); - r.assert_penalties_of_type("lookup_custody_column_processing_failure"); + r.assert_penalties_of_type("AvailabilityCheck"); } } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 647b5858cb..988e2d1fc5 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -387,6 +387,14 @@ pub fn cli_app() -> Command { .help("Disables the quic transport. The node will rely solely on the TCP transport for libp2p connections.") .display_order(0) ) + .arg( + Arg::new("enable-mplex") + .long("enable-mplex") + .action(ArgAction::SetTrue) + .help_heading(FLAG_HEADER) + .help("Enables mplex multiplexer alongside yamux. Yamux is preferred when both are available.") + .display_order(0) + ) .arg( Arg::new("disable-peer-scoring") .long("disable-peer-scoring") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 045b432dc9..ddf8d07c4e 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -1443,6 +1443,10 @@ pub fn set_network_config( config.disable_quic_support = true; } + if parse_flag(cli_args, "enable-mplex") { + config.enable_mplex = true; + } + if parse_flag(cli_args, "disable-upnp") { config.upnp_enabled = false; } diff --git a/book/src/help_bn.md b/book/src/help_bn.md index 30163f1f0c..1f57db1b59 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -494,6 +494,9 @@ Flags: Sets the local ENR IP address and port to match those set for lighthouse. Specifically, the IP address will be the value of --listen-address and the UDP port will be --discovery-port. + --enable-mplex + Enables mplex multiplexer alongside yamux. Yamux is preferred when + both are available. --enable-partial-columns Enable partial messages for data columns. This can reduce the amount of data sent over the network. Enabled by default on Hoodi and diff --git a/consensus/types/src/block/signed_beacon_block.rs b/consensus/types/src/block/signed_beacon_block.rs index 1a87a519d0..1ade0f82a3 100644 --- a/consensus/types/src/block/signed_beacon_block.rs +++ b/consensus/types/src/block/signed_beacon_block.rs @@ -361,6 +361,14 @@ impl> SignedBeaconBlock .unwrap_or(0) } + pub fn parent_block_hash(&self) -> Option { + self.message() + .body() + .signed_execution_payload_bid() + .ok() + .map(|bid| bid.message.parent_block_hash) + } + /// Used for displaying commitments in logs. pub fn commitments_formatted(&self) -> String { let Ok(commitments) = self.message().body().blob_kzg_commitments() else {