diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ebd0ef186a..f7f186704f 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7282,6 +7282,31 @@ impl BeaconChain { Ok(None) } + + /// Retrieves block roots (in ascending slot order) within some slot range from fork choice. + pub fn block_roots_from_fork_choice(&self, start_slot: u64, count: u64) -> Vec { + let head_block_root = self.canonical_head.cached_head().head_block_root(); + let fork_choice_read_lock = self.canonical_head.fork_choice_read_lock(); + let block_roots_iter = fork_choice_read_lock + .proto_array() + .iter_block_roots(&head_block_root); + let end_slot = start_slot.saturating_add(count); + let mut roots = vec![]; + + for (root, slot) in block_roots_iter { + if slot < end_slot && slot >= start_slot { + roots.push(root); + } + if slot < start_slot { + break; + } + } + + drop(fork_choice_read_lock); + // return in ascending slot order + roots.reverse(); + roots + } } impl Drop for BeaconChain { diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 857fc266da..341177e137 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -675,86 +675,32 @@ impl NetworkBeaconProcessor { request_id: RequestId, req: BlocksByRangeRequest, ) -> Result<(), (RpcErrorResponse, &'static str)> { + let req_start_slot = *req.start_slot(); + let req_count = *req.count(); + debug!(self.log, "Received BlocksByRange Request"; "peer_id" => %peer_id, - "count" => req.count(), - "start_slot" => req.start_slot(), + "start_slot" => req_start_slot, + "count" => req_count, ); - let forwards_block_root_iter = match self - .chain - .forwards_iter_block_roots(Slot::from(*req.start_slot())) - { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!(self.log, "Unable to obtain root iter"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Pick out the required blocks, ignoring skip-slots. - let mut last_block_root = None; - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| { - slot.as_u64() < req.start_slot().saturating_add(*req.count()) - }) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(self.log, "Error during iteration over blocks"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Iteration error")); - } - }; - - // remove all skip slots - let block_roots = block_roots.into_iter().flatten().collect::>(); - + let block_roots = + self.get_block_roots_for_slot_range(req_start_slot, req_count, "BlocksByRange")?; let current_slot = self .chain .slot() .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - let log_results = |req: BlocksByRangeRequest, peer_id, blocks_sent| { - if blocks_sent < (*req.count() as usize) { + let log_results = |peer_id, blocks_sent| { + if blocks_sent < (req_count as usize) { debug!( self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot(), + "start_slot" => req_start_slot, "current_slot" => current_slot, - "requested" => req.count(), + "requested" => req_count, "returned" => blocks_sent ); } else { @@ -762,9 +708,9 @@ impl NetworkBeaconProcessor { self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, - "start_slot" => req.start_slot(), + "start_slot" => req_start_slot, "current_slot" => current_slot, - "requested" => req.count(), + "requested" => req_count, "returned" => blocks_sent ); } @@ -785,8 +731,7 @@ impl NetworkBeaconProcessor { Ok(Some(block)) => { // Due to skip slots, blocks could be out of the range, we ensure they // are in the range before sending - if block.slot() >= *req.start_slot() - && block.slot() < req.start_slot() + req.count() + if block.slot() >= req_start_slot && block.slot() < req_start_slot + req.count() { blocks_sent += 1; self.send_network_message(NetworkMessage::SendResponse { @@ -805,7 +750,7 @@ impl NetworkBeaconProcessor { "peer" => %peer_id, "request_root" => ?root ); - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); return Err((RpcErrorResponse::ServerError, "Database inconsistency")); } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { @@ -815,7 +760,7 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "reason" => "execution layer not synced", ); - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); // send the stream terminator return Err(( RpcErrorResponse::ResourceUnavailable, @@ -843,17 +788,121 @@ impl NetworkBeaconProcessor { "error" => ?e ); } - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); // send the stream terminator return Err((RpcErrorResponse::ServerError, "Failed fetching blocks")); } } } - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); Ok(()) } + fn get_block_roots_for_slot_range( + &self, + req_start_slot: u64, + req_count: u64, + req_type: &str, + ) -> Result, (RpcErrorResponse, &'static str)> { + let block_roots_timer = std::time::Instant::now(); + let finalized_slot = self + .chain + .canonical_head + .cached_head() + .finalized_checkpoint() + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + let (block_roots, block_roots_source) = if req_start_slot >= finalized_slot.as_u64() { + ( + self.chain + .block_roots_from_fork_choice(req_start_slot, req_count), + "fork_choice", + ) + } else { + ( + self.get_block_roots_from_store(req_start_slot, req_count)?, + "store", + ) + }; + + debug!( + self.log, + "Range request block roots retrieved"; + "req_type" => req_type, + "start_slot" => req_start_slot, + "count" => req_count, + "block_roots_count" => block_roots.len(), + "block_roots_source" => block_roots_source, + "elapsed" => ?block_roots_timer.elapsed() + ); + + Ok(block_roots) + } + + /// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator. + fn get_block_roots_from_store( + &self, + start_slot: u64, + count: u64, + ) -> Result, (RpcErrorResponse, &'static str)> { + let forwards_block_root_iter = + match self.chain.forwards_iter_block_roots(Slot::from(start_slot)) { + Ok(iter) => iter, + Err(BeaconChainError::HistoricalBlockOutOfRange { + slot, + oldest_block_slot, + }) => { + debug!(self.log, "Range request failed during backfill"; + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot + ); + return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); + } + Err(e) => { + error!(self.log, "Unable to obtain root iter for range request"; + "start_slot" => start_slot, + "count" => count, + "error" => ?e + ); + return Err((RpcErrorResponse::ServerError, "Database error")); + } + }; + + // Pick out the required blocks, ignoring skip-slots. + let mut last_block_root = None; + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| slot.as_u64() < start_slot.saturating_add(count)) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .collect::>>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => { + error!(self.log, "Error during iteration over blocks for range request"; + "start_slot" => start_slot, + "count" => count, + "error" => ?e + ); + return Err((RpcErrorResponse::ServerError, "Iteration error")); + } + }; + + // remove all skip slots + Ok(block_roots.into_iter().flatten().collect::>()) + } + /// Handle a `BlobsByRange` request from the peer. pub fn handle_blobs_by_range_request( self: Arc, @@ -932,65 +981,8 @@ impl NetworkBeaconProcessor { }; } - let forwards_block_root_iter = - match self.chain.forwards_iter_block_roots(request_start_slot) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!(self.log, "Unable to obtain root iter"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Use `WhenSlotSkipped::Prev` to get the most recent block root prior to - // `request_start_slot` in order to check whether the `request_start_slot` is a skip. - let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| { - self.chain - .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) - .ok() - .flatten() - }); - - // Pick out the required blocks, ignoring skip-slots. - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(self.log, "Error during iteration over blocks"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; + let block_roots = + self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?; let current_slot = self .chain @@ -1009,8 +1001,6 @@ impl NetworkBeaconProcessor { ); }; - // remove all skip slots - let block_roots = block_roots.into_iter().flatten(); let mut blobs_sent = 0; for root in block_roots { @@ -1136,68 +1126,8 @@ impl NetworkBeaconProcessor { }; } - let forwards_block_root_iter = - match self.chain.forwards_iter_block_roots(request_start_slot) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!(self.log, "Unable to obtain root iter"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Use `WhenSlotSkipped::Prev` to get the most recent block root prior to - // `request_start_slot` in order to check whether the `request_start_slot` is a skip. - let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| { - self.chain - .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) - .ok() - .flatten() - }); - - // Pick out the required blocks, ignoring skip-slots. - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(self.log, "Error during iteration over blocks"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // remove all skip slots - let block_roots = block_roots.into_iter().flatten(); + let block_roots = + self.get_block_roots_for_slot_range(req.start_slot, req.count, "DataColumnsByRange")?; let mut data_columns_sent = 0; for root in block_roots { diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 88d4660311..891590f475 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -856,10 +856,18 @@ impl ProtoArrayForkChoice { } /// See `ProtoArray::iter_nodes` - pub fn iter_nodes<'a>(&'a self, block_root: &Hash256) -> Iter<'a> { + pub fn iter_nodes(&self, block_root: &Hash256) -> Iter { self.proto_array.iter_nodes(block_root) } + /// See `ProtoArray::iter_block_roots` + pub fn iter_block_roots( + &self, + block_root: &Hash256, + ) -> impl Iterator + use<'_> { + self.proto_array.iter_block_roots(block_root) + } + pub fn as_bytes(&self) -> Vec { SszContainer::from(self).as_ssz_bytes() }