From 3992d6ba74c97cc66b5b1753b1ed41c761464577 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 11 Feb 2025 03:07:13 -0300 Subject: [PATCH] Fix misc PeerDAS todos (#6862) Address misc PeerDAS TODOs that are not too big for a dedicated PR I'll justify each TODO on an inlined comment --- beacon_node/beacon_chain/src/beacon_chain.rs | 3 +- .../overflow_lru_cache.rs | 12 +------ .../gossip_methods.rs | 20 ++++------- .../network_beacon_processor/sync_methods.rs | 24 ++++++++++++- .../network/src/sync/block_lookups/mod.rs | 4 +-- beacon_node/network/src/sync/manager.rs | 4 +-- .../network/src/sync/network_context.rs | 18 +++++----- .../src/sync/network_context/custody.rs | 34 ++++++++++++------- beacon_node/network/src/sync/tests/lookups.rs | 2 -- .../types/src/data_column_custody_group.rs | 2 ++ 10 files changed, 68 insertions(+), 55 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ca21b519f1..6d46aaabe8 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2974,10 +2974,9 @@ impl BeaconChain { /// Only completed sampling results are received. Blocks are unavailable by default and should /// be pruned on finalization, on a timeout or by a max count. pub async fn process_sampling_completed(self: &Arc, block_root: Hash256) { - // TODO(das): update fork-choice + // TODO(das): update fork-choice, act on sampling result, adjust log level // NOTE: It is possible that sampling complets before block is imported into fork choice, // in that case we may need to update availability cache. - // TODO(das): These log levels are too high, reduce once DAS matures info!(self.log, "Sampling completed"; "block_root" => %block_root); } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index cd793c8394..7592ffd149 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -110,15 +110,6 @@ impl PendingComponents { self.get_cached_blobs().iter().flatten().count() } - /// Checks if a data column of a given index exists in the cache. - /// - /// Returns: - /// - `true` if a data column for the given index exists. - /// - `false` otherwise. - fn data_column_exists(&self, data_column_index: u64) -> bool { - self.get_cached_data_column(data_column_index).is_some() - } - /// Returns the number of data columns that have been received and are stored in the cache. pub fn num_received_data_columns(&self) -> usize { self.verified_data_columns.len() @@ -182,8 +173,7 @@ impl PendingComponents { kzg_verified_data_columns: I, ) -> Result<(), AvailabilityCheckError> { for data_column in kzg_verified_data_columns { - // TODO(das): Add equivalent checks for data columns if necessary - if !self.data_column_exists(data_column.index()) { + if self.get_cached_data_column(data_column.index()).is_none() { self.verified_data_columns.push(data_column); } } diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 090b963cbc..4338bfbc89 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1477,22 +1477,13 @@ impl NetworkBeaconProcessor { ); return None; } - Err(e @ BlockError::InternalError(_)) => { + // BlobNotRequired is unreachable. Only constructed in `process_gossip_blob` + Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => { error!(self.log, "Internal block gossip validation error"; "error" => %e ); return None; } - Err(e @ BlockError::BlobNotRequired(_)) => { - // TODO(das): penalty not implemented yet as other clients may still send us blobs - // during early stage of implementation. - debug!(self.log, "Received blobs for slot after PeerDAS epoch from peer"; - "error" => %e, - "peer_id" => %peer_id, - ); - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); - return None; - } }; metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL); @@ -1603,9 +1594,10 @@ impl NetworkBeaconProcessor { let block = verified_block.block.block_cloned(); let block_root = verified_block.block_root; - // TODO(das) Might be too early to issue a request here. We haven't checked that the block - // actually includes blob transactions and thus has data. A peer could send a block is - // garbage commitments, and make us trigger sampling for a block that does not have data. + // Note: okay to issue sampling request before the block is execution verified. If the + // proposer sends us a block with invalid blob transactions it can trigger us to issue + // sampling queries that will never resolve. This attack is equivalent to withholding data. + // Dismissed proposal to move this block to post-execution: https://github.com/sigp/lighthouse/pull/6492 if block.num_expected_blobs() > 0 { // Trigger sampling for block not yet execution valid. At this point column custodials are // unlikely to have received their columns. Triggering sampling so early is only viable with diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 338f2bc4c8..f5fe7ee98b 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -336,9 +336,31 @@ impl NetworkBeaconProcessor { self: Arc>, block_root: Hash256, custody_columns: DataColumnSidecarList, - _seen_timestamp: Duration, + seen_timestamp: Duration, process_type: BlockProcessType, ) { + // custody_columns must always have at least one element + let Some(slot) = custody_columns.first().map(|d| d.slot()) else { + return; + }; + + if let Ok(current_slot) = self.chain.slot() { + if current_slot == slot { + let delay = get_slot_delay_ms(seen_timestamp, slot, &self.chain.slot_clock); + metrics::observe_duration(&metrics::BEACON_BLOB_RPC_SLOT_START_DELAY_TIME, delay); + } + } + + let mut indices = custody_columns.iter().map(|d| d.index).collect::>(); + indices.sort_unstable(); + debug!( + self.log, + "RPC custody data columns received"; + "indices" => ?indices, + "block_root" => %block_root, + "slot" => %slot, + ); + let mut result = self .chain .process_rpc_custody_columns(custody_columns) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 2172c8dcd8..a29f9cf402 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -479,8 +479,8 @@ impl BlockLookups { // continue_request will send for processing as the request state is AwaitingProcessing } Err(e) => { - // TODO(das): is it okay to not log the peer source of request failures? Then we - // should log individual requests failures in the SyncNetworkContext + // No need to log peer source here. When sending a DataColumnsByRoot request we log + // the peer and the request ID which is linked to this `id` value here. debug!(self.log, "Received lookup download failure"; "block_root" => ?block_root, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 14702d3536..a9e5f646cc 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -1217,12 +1217,10 @@ impl SyncManager { requester: CustodyRequester, response: CustodyByRootResult, ) { - // TODO(das): get proper timestamp - let seen_timestamp = timestamp_now(); self.block_lookups .on_download_response::>( requester.0, - response.map(|(columns, peer_group)| (columns, peer_group, seen_timestamp)), + response, &mut self.network, ); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b03a446add..968a9bcddd 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -68,7 +68,9 @@ impl RpcEvent { pub type RpcResponseResult = Result<(T, Duration), RpcResponseError>; -pub type CustodyByRootResult = Result<(DataColumnSidecarList, PeerGroup), RpcResponseError>; +/// Duration = latest seen timestamp of all received data columns +pub type CustodyByRootResult = + Result<(DataColumnSidecarList, PeerGroup, Duration), RpcResponseError>; #[derive(Debug)] pub enum RpcResponseError { @@ -1190,7 +1192,7 @@ impl SyncNetworkContext { // Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to // an Option first to use in an `if let Some() { act on result }` block. match result.as_ref() { - Some(Ok((columns, peer_group))) => { + Some(Ok((columns, peer_group, _))) => { debug!(self.log, "Custody request success, removing"; "id" => ?id, "count" => columns.len(), "peers" => ?peer_group) } Some(Err(e)) => { @@ -1208,7 +1210,7 @@ impl SyncNetworkContext { id: Id, block_root: Hash256, block: RpcBlock, - duration: Duration, + seen_timestamp: Duration, ) -> Result<(), SendErrorProcessor> { let beacon_processor = self .beacon_processor_if_enabled() @@ -1221,7 +1223,7 @@ impl SyncNetworkContext { .send_rpc_beacon_block( block_root, block, - duration, + seen_timestamp, BlockProcessType::SingleBlock { id }, ) .map_err(|e| { @@ -1239,7 +1241,7 @@ impl SyncNetworkContext { id: Id, block_root: Hash256, blobs: FixedBlobSidecarList, - duration: Duration, + seen_timestamp: Duration, ) -> Result<(), SendErrorProcessor> { let beacon_processor = self .beacon_processor_if_enabled() @@ -1252,7 +1254,7 @@ impl SyncNetworkContext { .send_rpc_blobs( block_root, blobs, - duration, + seen_timestamp, BlockProcessType::SingleBlob { id }, ) .map_err(|e| { @@ -1270,7 +1272,7 @@ impl SyncNetworkContext { _id: Id, block_root: Hash256, custody_columns: DataColumnSidecarList, - duration: Duration, + seen_timestamp: Duration, process_type: BlockProcessType, ) -> Result<(), SendErrorProcessor> { let beacon_processor = self @@ -1280,7 +1282,7 @@ impl SyncNetworkContext { debug!(self.log, "Sending custody columns for processing"; "block" => ?block_root, "process_type" => ?process_type); beacon_processor - .send_rpc_custody_columns(block_root, custody_columns, duration, process_type) + .send_rpc_custody_columns(block_root, custody_columns, seen_timestamp, process_type) .map_err(|e| { error!( self.log, diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index 8a29545c21..38353d3ea2 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -1,7 +1,7 @@ use crate::sync::network_context::{ DataColumnsByRootRequestId, DataColumnsByRootSingleBlockRequest, }; - +use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester}; @@ -61,7 +61,8 @@ struct ActiveBatchColumnsRequest { indices: Vec, } -pub type CustodyRequestResult = Result, PeerGroup)>, Error>; +pub type CustodyRequestResult = + Result, PeerGroup, Duration)>, Error>; impl ActiveCustodyRequest { pub(crate) fn new( @@ -102,8 +103,6 @@ impl ActiveCustodyRequest { resp: RpcResponseResult>, cx: &mut SyncNetworkContext, ) -> CustodyRequestResult { - // TODO(das): Should downscore peers for verify errors here - let Some(batch_request) = self.active_batch_columns_requests.get_mut(&req_id) else { warn!(self.log, "Received custody column response for unrequested index"; @@ -115,7 +114,7 @@ impl ActiveCustodyRequest { }; match resp { - Ok((data_columns, _seen_timestamp)) => { + Ok((data_columns, seen_timestamp)) => { debug!(self.log, "Custody column download success"; "id" => ?self.custody_id, @@ -141,7 +140,12 @@ impl ActiveCustodyRequest { .ok_or(Error::BadState("unknown column_index".to_owned()))?; if let Some(data_column) = data_columns.remove(column_index) { - column_request.on_download_success(req_id, peer_id, data_column)?; + column_request.on_download_success( + req_id, + peer_id, + data_column, + seen_timestamp, + )?; } else { // Peer does not have the requested data. // TODO(das) do not consider this case a success. We know for sure the block has @@ -204,20 +208,23 @@ impl ActiveCustodyRequest { if self.column_requests.values().all(|r| r.is_downloaded()) { // All requests have completed successfully. let mut peers = HashMap::>::new(); + let mut seen_timestamps = vec![]; let columns = std::mem::take(&mut self.column_requests) .into_values() .map(|request| { - let (peer, data_column) = request.complete()?; + let (peer, data_column, seen_timestamp) = request.complete()?; peers .entry(peer) .or_default() .push(data_column.index as usize); + seen_timestamps.push(seen_timestamp); Ok(data_column) }) .collect::, _>>()?; let peer_group = PeerGroup::from_set(peers); - return Ok(Some((columns, peer_group))); + let max_seen_timestamp = seen_timestamps.into_iter().max().unwrap_or(timestamp_now()); + return Ok(Some((columns, peer_group, max_seen_timestamp))); } let mut columns_to_request_by_peer = HashMap::>::new(); @@ -335,7 +342,7 @@ struct ColumnRequest { enum Status { NotStarted(Instant), Downloading(DataColumnsByRootRequestId), - Downloaded(PeerId, Arc>), + Downloaded(PeerId, Arc>, Duration), } impl ColumnRequest { @@ -404,6 +411,7 @@ impl ColumnRequest { req_id: DataColumnsByRootRequestId, peer_id: PeerId, data_column: Arc>, + seen_timestamp: Duration, ) -> Result<(), Error> { match &self.status { Status::Downloading(expected_req_id) => { @@ -413,7 +421,7 @@ impl ColumnRequest { req_id, }); } - self.status = Status::Downloaded(peer_id, data_column); + self.status = Status::Downloaded(peer_id, data_column, seen_timestamp); Ok(()) } other => Err(Error::BadState(format!( @@ -422,9 +430,11 @@ impl ColumnRequest { } } - fn complete(self) -> Result<(PeerId, Arc>), Error> { + fn complete(self) -> Result<(PeerId, Arc>, Duration), Error> { match self.status { - Status::Downloaded(peer_id, data_column) => Ok((peer_id, data_column)), + Status::Downloaded(peer_id, data_column, seen_timestamp) => { + Ok((peer_id, data_column, seen_timestamp)) + } other => Err(Error::BadState(format!( "bad state complete expected Downloaded got {other:?}" ))), diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index ea20141df6..10117285eb 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -713,7 +713,6 @@ impl TestRig { self.complete_data_columns_by_root_request(id, data_columns); // Expect work event - // TODO(das): worth it to append sender id to the work event for stricter assertion? self.expect_rpc_sample_verify_work_event(); // Respond with valid result @@ -755,7 +754,6 @@ impl TestRig { } // Expect work event - // TODO(das): worth it to append sender id to the work event for stricter assertion? self.expect_rpc_custody_column_work_event(); // Respond with valid result diff --git a/consensus/types/src/data_column_custody_group.rs b/consensus/types/src/data_column_custody_group.rs index bb204c34a2..9e9505da9f 100644 --- a/consensus/types/src/data_column_custody_group.rs +++ b/consensus/types/src/data_column_custody_group.rs @@ -17,6 +17,8 @@ pub enum DataColumnCustodyGroupError { /// The `get_custody_groups` function is used to determine the custody groups that a node is /// assigned to. /// +/// Note: `get_custody_groups(node_id, x)` is a subset of `get_custody_groups(node_id, y)` if `x < y`. +/// /// spec: https://github.com/ethereum/consensus-specs/blob/8e0d0d48e81d6c7c5a8253ab61340f5ea5bac66a/specs/fulu/das-core.md#get_custody_groups pub fn get_custody_groups( raw_node_id: [u8; 32],