From d511ca049422009c4ba0c2d7068981ef513f0ae8 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 7 Apr 2025 00:16:41 -0300 Subject: [PATCH] Compute roots for unfinalized by_range requests with fork-choice (#7098) Includes PRs - https://github.com/sigp/lighthouse/pull/7058 - https://github.com/sigp/lighthouse/pull/7066 Cleaner for the `release-v7.0.0` branch --- beacon_node/beacon_chain/src/beacon_chain.rs | 25 ++ beacon_node/http_api/Cargo.toml | 1 + beacon_node/network/src/metrics.rs | 9 + .../network_beacon_processor/rpc_methods.rs | 361 ++++++++---------- .../src/proto_array_fork_choice.rs | 10 +- 5 files changed, 209 insertions(+), 197 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 624dc968ad..42e6deaf16 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7344,6 +7344,31 @@ impl BeaconChain { Ok(None) } + + /// Retrieves block roots (in ascending slot order) within some slot range from fork choice. + pub fn block_roots_from_fork_choice(&self, start_slot: u64, count: u64) -> Vec { + let head_block_root = self.canonical_head.cached_head().head_block_root(); + let fork_choice_read_lock = self.canonical_head.fork_choice_read_lock(); + let block_roots_iter = fork_choice_read_lock + .proto_array() + .iter_block_roots(&head_block_root); + let end_slot = start_slot.saturating_add(count); + let mut roots = vec![]; + + for (root, slot) in block_roots_iter { + if slot < end_slot && slot >= start_slot { + roots.push(root); + } + if slot < start_slot { + break; + } + } + + drop(fork_choice_read_lock); + // return in ascending slot order + roots.reverse(); + roots + } } impl Drop for BeaconChain { diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 2fb3ec06bf..7861f03000 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -28,6 +28,7 @@ metrics = { workspace = true } network = { workspace = true } operation_pool = { workspace = true } parking_lot = { workspace = true } +proto_array = { workspace = true } rand = { workspace = true } safe_arith = { workspace = true } sensitive_url = { workspace = true } diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 154a59eade..92b3349577 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -87,6 +87,15 @@ pub static BEACON_PROCESSOR_IMPORT_ERRORS_PER_TYPE: LazyLock> = + LazyLock::new(|| { + try_create_histogram_vec_with_buckets( + "beacon_processor_get_block_roots_time_seconds", + "Time to complete get_block_roots when serving by_range requests", + decimal_buckets(-3, -1), + &["source"], + ) + }); /* * Gossip processor diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 857fc266da..dec28eeb72 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -1,9 +1,10 @@ +use crate::metrics; use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::SyncMessage; use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; -use itertools::process_results; +use itertools::{process_results, Itertools}; use lighthouse_network::discovery::ConnectionId; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, @@ -675,86 +676,49 @@ impl NetworkBeaconProcessor { request_id: RequestId, req: BlocksByRangeRequest, ) -> Result<(), (RpcErrorResponse, &'static str)> { + let req_start_slot = *req.start_slot(); + let req_count = *req.count(); + debug!(self.log, "Received BlocksByRange Request"; "peer_id" => %peer_id, - "count" => req.count(), - "start_slot" => req.start_slot(), + "start_slot" => req_start_slot, + "count" => req_count, ); - let forwards_block_root_iter = match self - .chain - .forwards_iter_block_roots(Slot::from(*req.start_slot())) - { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!(self.log, "Unable to obtain root iter"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Pick out the required blocks, ignoring skip-slots. - let mut last_block_root = None; - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| { - slot.as_u64() < req.start_slot().saturating_add(*req.count()) - }) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(self.log, "Error during iteration over blocks"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Iteration error")); - } - }; - - // remove all skip slots - let block_roots = block_roots.into_iter().flatten().collect::>(); + // Spawn a blocking handle since get_block_roots_for_slot_range takes a sync lock on the + // fork-choice. + let network_beacon_processor = self.clone(); + let block_roots = self + .executor + .spawn_blocking_handle( + move || { + network_beacon_processor.get_block_roots_for_slot_range( + req_start_slot, + req_count, + "BlocksByRange", + ) + }, + "get_block_roots_for_slot_range", + ) + .ok_or((RpcErrorResponse::ServerError, "shutting down"))? + .await + .map_err(|_| (RpcErrorResponse::ServerError, "tokio join"))??; let current_slot = self .chain .slot() .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - let log_results = |req: BlocksByRangeRequest, peer_id, blocks_sent| { - if blocks_sent < (*req.count() as usize) { + let log_results = |peer_id, blocks_sent| { + if blocks_sent < (req_count as usize) { debug!( self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot(), + "start_slot" => req_start_slot, "current_slot" => current_slot, - "requested" => req.count(), + "requested" => req_count, "returned" => blocks_sent ); } else { @@ -762,9 +726,9 @@ impl NetworkBeaconProcessor { self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, - "start_slot" => req.start_slot(), + "start_slot" => req_start_slot, "current_slot" => current_slot, - "requested" => req.count(), + "requested" => req_count, "returned" => blocks_sent ); } @@ -785,8 +749,7 @@ impl NetworkBeaconProcessor { Ok(Some(block)) => { // Due to skip slots, blocks could be out of the range, we ensure they // are in the range before sending - if block.slot() >= *req.start_slot() - && block.slot() < req.start_slot() + req.count() + if block.slot() >= req_start_slot && block.slot() < req_start_slot + req.count() { blocks_sent += 1; self.send_network_message(NetworkMessage::SendResponse { @@ -805,7 +768,7 @@ impl NetworkBeaconProcessor { "peer" => %peer_id, "request_root" => ?root ); - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); return Err((RpcErrorResponse::ServerError, "Database inconsistency")); } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { @@ -815,7 +778,7 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "reason" => "execution layer not synced", ); - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); // send the stream terminator return Err(( RpcErrorResponse::ResourceUnavailable, @@ -843,17 +806,142 @@ impl NetworkBeaconProcessor { "error" => ?e ); } - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); // send the stream terminator return Err((RpcErrorResponse::ServerError, "Failed fetching blocks")); } } } - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); Ok(()) } + fn get_block_roots_for_slot_range( + &self, + req_start_slot: u64, + req_count: u64, + req_type: &str, + ) -> Result, (RpcErrorResponse, &'static str)> { + let start_time = std::time::Instant::now(); + let finalized_slot = self + .chain + .canonical_head + .cached_head() + .finalized_checkpoint() + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + let (block_roots, source) = if req_start_slot >= finalized_slot.as_u64() { + // If the entire requested range is after finalization, use fork_choice + ( + self.chain + .block_roots_from_fork_choice(req_start_slot, req_count), + "fork_choice", + ) + } else if req_start_slot + req_count <= finalized_slot.as_u64() { + // If the entire requested range is before finalization, use store + ( + self.get_block_roots_from_store(req_start_slot, req_count)?, + "store", + ) + } else { + // Split the request at the finalization boundary + let count_from_store = finalized_slot.as_u64() - req_start_slot; + let count_from_fork_choice = req_count - count_from_store; + let start_slot_fork_choice = finalized_slot.as_u64(); + + // Get roots from store (up to and including finalized slot) + let mut roots_from_store = + self.get_block_roots_from_store(req_start_slot, count_from_store)?; + + // Get roots from fork choice (after finalized slot) + let roots_from_fork_choice = self + .chain + .block_roots_from_fork_choice(start_slot_fork_choice, count_from_fork_choice); + + roots_from_store.extend(roots_from_fork_choice); + + (roots_from_store, "mixed") + }; + + let elapsed = start_time.elapsed(); + metrics::observe_timer_vec( + &metrics::BEACON_PROCESSOR_GET_BLOCK_ROOTS_TIME, + &[source], + elapsed, + ); + + debug!( + self.log, + "Range request block roots retrieved"; + "req_type" => req_type, + "start_slot" => req_start_slot, + "req_count" => req_count, + "roots_count" => block_roots.len(), + "source" => source, + "elapsed" => ?elapsed, + "finalized_slot" => finalized_slot + ); + + Ok(block_roots) + } + + /// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator. + fn get_block_roots_from_store( + &self, + start_slot: u64, + count: u64, + ) -> Result, (RpcErrorResponse, &'static str)> { + let forwards_block_root_iter = + match self.chain.forwards_iter_block_roots(Slot::from(start_slot)) { + Ok(iter) => iter, + Err(BeaconChainError::HistoricalBlockOutOfRange { + slot, + oldest_block_slot, + }) => { + debug!(self.log, "Range request failed during backfill"; + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot + ); + return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); + } + Err(e) => { + error!(self.log, "Unable to obtain root iter for range request"; + "start_slot" => start_slot, + "count" => count, + "error" => ?e + ); + return Err((RpcErrorResponse::ServerError, "Database error")); + } + }; + + // Pick out the required blocks, ignoring skip-slots. + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| slot.as_u64() < start_slot.saturating_add(count)) + .collect::>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => { + error!(self.log, "Error during iteration over blocks for range request"; + "start_slot" => start_slot, + "count" => count, + "error" => ?e + ); + return Err((RpcErrorResponse::ServerError, "Iteration error")); + } + }; + + // remove all skip slots i.e. duplicated roots + Ok(block_roots + .into_iter() + .map(|(root, _)| root) + .unique() + .collect::>()) + } + /// Handle a `BlobsByRange` request from the peer. pub fn handle_blobs_by_range_request( self: Arc, @@ -932,65 +1020,8 @@ impl NetworkBeaconProcessor { }; } - let forwards_block_root_iter = - match self.chain.forwards_iter_block_roots(request_start_slot) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!(self.log, "Unable to obtain root iter"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Use `WhenSlotSkipped::Prev` to get the most recent block root prior to - // `request_start_slot` in order to check whether the `request_start_slot` is a skip. - let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| { - self.chain - .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) - .ok() - .flatten() - }); - - // Pick out the required blocks, ignoring skip-slots. - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(self.log, "Error during iteration over blocks"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; + let block_roots = + self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?; let current_slot = self .chain @@ -1009,8 +1040,6 @@ impl NetworkBeaconProcessor { ); }; - // remove all skip slots - let block_roots = block_roots.into_iter().flatten(); let mut blobs_sent = 0; for root in block_roots { @@ -1136,68 +1165,8 @@ impl NetworkBeaconProcessor { }; } - let forwards_block_root_iter = - match self.chain.forwards_iter_block_roots(request_start_slot) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!(self.log, "Unable to obtain root iter"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Use `WhenSlotSkipped::Prev` to get the most recent block root prior to - // `request_start_slot` in order to check whether the `request_start_slot` is a skip. - let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| { - self.chain - .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) - .ok() - .flatten() - }); - - // Pick out the required blocks, ignoring skip-slots. - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(self.log, "Error during iteration over blocks"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // remove all skip slots - let block_roots = block_roots.into_iter().flatten(); + let block_roots = + self.get_block_roots_for_slot_range(req.start_slot, req.count, "DataColumnsByRange")?; let mut data_columns_sent = 0; for root in block_roots { diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 4da632bf58..e7e6b54f1d 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -856,10 +856,18 @@ impl ProtoArrayForkChoice { } /// See `ProtoArray::iter_nodes` - pub fn iter_nodes<'a>(&'a self, block_root: &Hash256) -> Iter<'a> { + pub fn iter_nodes(&self, block_root: &Hash256) -> Iter { self.proto_array.iter_nodes(block_root) } + /// See `ProtoArray::iter_block_roots` + pub fn iter_block_roots( + &self, + block_root: &Hash256, + ) -> impl Iterator + use<'_> { + self.proto_array.iter_block_roots(block_root) + } + pub fn as_bytes(&self) -> Vec { SszContainer::from(self).as_ssz_bytes() }