Load block roots from fork choice where possible when serving BlocksByRange requests (#7058)

* Load block roots from fork choice where possible to avoid loading state from disk when serving block by range requests.

* Check if the start slot is newer than finalization (`start_slot >= finalized_slot`), and use fork choice in that case.
This commit is contained in:
Jimmy Chen
2025-03-01 08:41:24 +11:00
committed by GitHub
parent 9f15f31942
commit 8706040094
3 changed files with 160 additions and 197 deletions

View File

@@ -7282,6 +7282,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(None) Ok(None)
} }
/// Retrieves block roots (in ascending slot order) within some slot range from fork choice.
pub fn block_roots_from_fork_choice(&self, start_slot: u64, count: u64) -> Vec<Hash256> {
let head_block_root = self.canonical_head.cached_head().head_block_root();
let fork_choice_read_lock = self.canonical_head.fork_choice_read_lock();
let block_roots_iter = fork_choice_read_lock
.proto_array()
.iter_block_roots(&head_block_root);
let end_slot = start_slot.saturating_add(count);
let mut roots = vec![];
for (root, slot) in block_roots_iter {
if slot < end_slot && slot >= start_slot {
roots.push(root);
}
if slot < start_slot {
break;
}
}
drop(fork_choice_read_lock);
// return in ascending slot order
roots.reverse();
roots
}
} }
impl<T: BeaconChainTypes> Drop for BeaconChain<T> { impl<T: BeaconChainTypes> Drop for BeaconChain<T> {

View File

@@ -675,86 +675,32 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request_id: RequestId, request_id: RequestId,
req: BlocksByRangeRequest, req: BlocksByRangeRequest,
) -> Result<(), (RpcErrorResponse, &'static str)> { ) -> Result<(), (RpcErrorResponse, &'static str)> {
let req_start_slot = *req.start_slot();
let req_count = *req.count();
debug!(self.log, "Received BlocksByRange Request"; debug!(self.log, "Received BlocksByRange Request";
"peer_id" => %peer_id, "peer_id" => %peer_id,
"count" => req.count(), "start_slot" => req_start_slot,
"start_slot" => req.start_slot(), "count" => req_count,
); );
let forwards_block_root_iter = match self let block_roots =
.chain self.get_block_roots_for_slot_range(req_start_slot, req_count, "BlocksByRange")?;
.forwards_iter_block_roots(Slot::from(*req.start_slot()))
{
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockOutOfRange {
slot,
oldest_block_slot,
}) => {
debug!(self.log, "Range request failed during backfill";
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
}
Err(e) => {
error!(self.log, "Unable to obtain root iter";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Database error"));
}
};
// Pick out the required blocks, ignoring skip-slots.
let mut last_block_root = None;
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| {
slot.as_u64() < req.start_slot().saturating_add(*req.count())
})
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
});
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
error!(self.log, "Error during iteration over blocks";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Iteration error"));
}
};
// remove all skip slots
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
let current_slot = self let current_slot = self
.chain .chain
.slot() .slot()
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
let log_results = |req: BlocksByRangeRequest, peer_id, blocks_sent| { let log_results = |peer_id, blocks_sent| {
if blocks_sent < (*req.count() as usize) { if blocks_sent < (req_count as usize) {
debug!( debug!(
self.log, self.log,
"BlocksByRange outgoing response processed"; "BlocksByRange outgoing response processed";
"peer" => %peer_id, "peer" => %peer_id,
"msg" => "Failed to return all requested blocks", "msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot(), "start_slot" => req_start_slot,
"current_slot" => current_slot, "current_slot" => current_slot,
"requested" => req.count(), "requested" => req_count,
"returned" => blocks_sent "returned" => blocks_sent
); );
} else { } else {
@@ -762,9 +708,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.log, self.log,
"BlocksByRange outgoing response processed"; "BlocksByRange outgoing response processed";
"peer" => %peer_id, "peer" => %peer_id,
"start_slot" => req.start_slot(), "start_slot" => req_start_slot,
"current_slot" => current_slot, "current_slot" => current_slot,
"requested" => req.count(), "requested" => req_count,
"returned" => blocks_sent "returned" => blocks_sent
); );
} }
@@ -785,8 +731,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Ok(Some(block)) => { Ok(Some(block)) => {
// Due to skip slots, blocks could be out of the range, we ensure they // Due to skip slots, blocks could be out of the range, we ensure they
// are in the range before sending // are in the range before sending
if block.slot() >= *req.start_slot() if block.slot() >= req_start_slot && block.slot() < req_start_slot + req.count()
&& block.slot() < req.start_slot() + req.count()
{ {
blocks_sent += 1; blocks_sent += 1;
self.send_network_message(NetworkMessage::SendResponse { self.send_network_message(NetworkMessage::SendResponse {
@@ -805,7 +750,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"peer" => %peer_id, "peer" => %peer_id,
"request_root" => ?root "request_root" => ?root
); );
log_results(req, peer_id, blocks_sent); log_results(peer_id, blocks_sent);
return Err((RpcErrorResponse::ServerError, "Database inconsistency")); return Err((RpcErrorResponse::ServerError, "Database inconsistency"));
} }
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
@@ -815,7 +760,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"block_root" => ?root, "block_root" => ?root,
"reason" => "execution layer not synced", "reason" => "execution layer not synced",
); );
log_results(req, peer_id, blocks_sent); log_results(peer_id, blocks_sent);
// send the stream terminator // send the stream terminator
return Err(( return Err((
RpcErrorResponse::ResourceUnavailable, RpcErrorResponse::ResourceUnavailable,
@@ -843,17 +788,121 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"error" => ?e "error" => ?e
); );
} }
log_results(req, peer_id, blocks_sent); log_results(peer_id, blocks_sent);
// send the stream terminator // send the stream terminator
return Err((RpcErrorResponse::ServerError, "Failed fetching blocks")); return Err((RpcErrorResponse::ServerError, "Failed fetching blocks"));
} }
} }
} }
log_results(req, peer_id, blocks_sent); log_results(peer_id, blocks_sent);
Ok(()) Ok(())
} }
fn get_block_roots_for_slot_range(
&self,
req_start_slot: u64,
req_count: u64,
req_type: &str,
) -> Result<Vec<Hash256>, (RpcErrorResponse, &'static str)> {
let block_roots_timer = std::time::Instant::now();
let finalized_slot = self
.chain
.canonical_head
.cached_head()
.finalized_checkpoint()
.epoch
.start_slot(T::EthSpec::slots_per_epoch());
let (block_roots, block_roots_source) = if req_start_slot >= finalized_slot.as_u64() {
(
self.chain
.block_roots_from_fork_choice(req_start_slot, req_count),
"fork_choice",
)
} else {
(
self.get_block_roots_from_store(req_start_slot, req_count)?,
"store",
)
};
debug!(
self.log,
"Range request block roots retrieved";
"req_type" => req_type,
"start_slot" => req_start_slot,
"count" => req_count,
"block_roots_count" => block_roots.len(),
"block_roots_source" => block_roots_source,
"elapsed" => ?block_roots_timer.elapsed()
);
Ok(block_roots)
}
/// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator.
fn get_block_roots_from_store(
&self,
start_slot: u64,
count: u64,
) -> Result<Vec<Hash256>, (RpcErrorResponse, &'static str)> {
let forwards_block_root_iter =
match self.chain.forwards_iter_block_roots(Slot::from(start_slot)) {
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockOutOfRange {
slot,
oldest_block_slot,
}) => {
debug!(self.log, "Range request failed during backfill";
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
}
Err(e) => {
error!(self.log, "Unable to obtain root iter for range request";
"start_slot" => start_slot,
"count" => count,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Database error"));
}
};
// Pick out the required blocks, ignoring skip-slots.
let mut last_block_root = None;
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| slot.as_u64() < start_slot.saturating_add(count))
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
});
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
error!(self.log, "Error during iteration over blocks for range request";
"start_slot" => start_slot,
"count" => count,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Iteration error"));
}
};
// remove all skip slots
Ok(block_roots.into_iter().flatten().collect::<Vec<_>>())
}
/// Handle a `BlobsByRange` request from the peer. /// Handle a `BlobsByRange` request from the peer.
pub fn handle_blobs_by_range_request( pub fn handle_blobs_by_range_request(
self: Arc<Self>, self: Arc<Self>,
@@ -932,65 +981,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}; };
} }
let forwards_block_root_iter = let block_roots =
match self.chain.forwards_iter_block_roots(request_start_slot) { self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?;
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockOutOfRange {
slot,
oldest_block_slot,
}) => {
debug!(self.log, "Range request failed during backfill";
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
}
Err(e) => {
error!(self.log, "Unable to obtain root iter";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Database error"));
}
};
// Use `WhenSlotSkipped::Prev` to get the most recent block root prior to
// `request_start_slot` in order to check whether the `request_start_slot` is a skip.
let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| {
self.chain
.block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev)
.ok()
.flatten()
});
// Pick out the required blocks, ignoring skip-slots.
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
});
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
error!(self.log, "Error during iteration over blocks";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Database error"));
}
};
let current_slot = self let current_slot = self
.chain .chain
@@ -1009,8 +1001,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
); );
}; };
// remove all skip slots
let block_roots = block_roots.into_iter().flatten();
let mut blobs_sent = 0; let mut blobs_sent = 0;
for root in block_roots { for root in block_roots {
@@ -1136,68 +1126,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}; };
} }
let forwards_block_root_iter = let block_roots =
match self.chain.forwards_iter_block_roots(request_start_slot) { self.get_block_roots_for_slot_range(req.start_slot, req.count, "DataColumnsByRange")?;
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockOutOfRange {
slot,
oldest_block_slot,
}) => {
debug!(self.log, "Range request failed during backfill";
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
}
Err(e) => {
error!(self.log, "Unable to obtain root iter";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Database error"));
}
};
// Use `WhenSlotSkipped::Prev` to get the most recent block root prior to
// `request_start_slot` in order to check whether the `request_start_slot` is a skip.
let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| {
self.chain
.block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev)
.ok()
.flatten()
});
// Pick out the required blocks, ignoring skip-slots.
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
});
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
error!(self.log, "Error during iteration over blocks";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Database error"));
}
};
// remove all skip slots
let block_roots = block_roots.into_iter().flatten();
let mut data_columns_sent = 0; let mut data_columns_sent = 0;
for root in block_roots { for root in block_roots {

View File

@@ -856,10 +856,18 @@ impl ProtoArrayForkChoice {
} }
/// See `ProtoArray::iter_nodes` /// See `ProtoArray::iter_nodes`
pub fn iter_nodes<'a>(&'a self, block_root: &Hash256) -> Iter<'a> { pub fn iter_nodes(&self, block_root: &Hash256) -> Iter {
self.proto_array.iter_nodes(block_root) self.proto_array.iter_nodes(block_root)
} }
/// See `ProtoArray::iter_block_roots`
pub fn iter_block_roots(
&self,
block_root: &Hash256,
) -> impl Iterator<Item = (Hash256, Slot)> + use<'_> {
self.proto_array.iter_block_roots(block_root)
}
pub fn as_bytes(&self) -> Vec<u8> { pub fn as_bytes(&self) -> Vec<u8> {
SszContainer::from(self).as_ssz_bytes() SszContainer::from(self).as_ssz_bytes()
} }