mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-07 16:55:46 +00:00
Compute roots for unfinalized by_range requests with fork-choice (#7098)
Includes PRs - https://github.com/sigp/lighthouse/pull/7058 - https://github.com/sigp/lighthouse/pull/7066 Cleaner for the `release-v7.0.0` branch
This commit is contained in:
@@ -7344,6 +7344,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
|
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Retrieves block roots (in ascending slot order) within some slot range from fork choice.
|
||||||
|
pub fn block_roots_from_fork_choice(&self, start_slot: u64, count: u64) -> Vec<Hash256> {
|
||||||
|
let head_block_root = self.canonical_head.cached_head().head_block_root();
|
||||||
|
let fork_choice_read_lock = self.canonical_head.fork_choice_read_lock();
|
||||||
|
let block_roots_iter = fork_choice_read_lock
|
||||||
|
.proto_array()
|
||||||
|
.iter_block_roots(&head_block_root);
|
||||||
|
let end_slot = start_slot.saturating_add(count);
|
||||||
|
let mut roots = vec![];
|
||||||
|
|
||||||
|
for (root, slot) in block_roots_iter {
|
||||||
|
if slot < end_slot && slot >= start_slot {
|
||||||
|
roots.push(root);
|
||||||
|
}
|
||||||
|
if slot < start_slot {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(fork_choice_read_lock);
|
||||||
|
// return in ascending slot order
|
||||||
|
roots.reverse();
|
||||||
|
roots
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> Drop for BeaconChain<T> {
|
impl<T: BeaconChainTypes> Drop for BeaconChain<T> {
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ metrics = { workspace = true }
|
|||||||
network = { workspace = true }
|
network = { workspace = true }
|
||||||
operation_pool = { workspace = true }
|
operation_pool = { workspace = true }
|
||||||
parking_lot = { workspace = true }
|
parking_lot = { workspace = true }
|
||||||
|
proto_array = { workspace = true }
|
||||||
rand = { workspace = true }
|
rand = { workspace = true }
|
||||||
safe_arith = { workspace = true }
|
safe_arith = { workspace = true }
|
||||||
sensitive_url = { workspace = true }
|
sensitive_url = { workspace = true }
|
||||||
|
|||||||
@@ -87,6 +87,15 @@ pub static BEACON_PROCESSOR_IMPORT_ERRORS_PER_TYPE: LazyLock<Result<IntCounterVe
|
|||||||
&["source", "component", "type"],
|
&["source", "component", "type"],
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
pub static BEACON_PROCESSOR_GET_BLOCK_ROOTS_TIME: LazyLock<Result<HistogramVec>> =
|
||||||
|
LazyLock::new(|| {
|
||||||
|
try_create_histogram_vec_with_buckets(
|
||||||
|
"beacon_processor_get_block_roots_time_seconds",
|
||||||
|
"Time to complete get_block_roots when serving by_range requests",
|
||||||
|
decimal_buckets(-3, -1),
|
||||||
|
&["source"],
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Gossip processor
|
* Gossip processor
|
||||||
|
|||||||
@@ -1,9 +1,10 @@
|
|||||||
|
use crate::metrics;
|
||||||
use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE};
|
use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE};
|
||||||
use crate::service::NetworkMessage;
|
use crate::service::NetworkMessage;
|
||||||
use crate::status::ToStatusMessage;
|
use crate::status::ToStatusMessage;
|
||||||
use crate::sync::SyncMessage;
|
use crate::sync::SyncMessage;
|
||||||
use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped};
|
use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped};
|
||||||
use itertools::process_results;
|
use itertools::{process_results, Itertools};
|
||||||
use lighthouse_network::discovery::ConnectionId;
|
use lighthouse_network::discovery::ConnectionId;
|
||||||
use lighthouse_network::rpc::methods::{
|
use lighthouse_network::rpc::methods::{
|
||||||
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
|
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
|
||||||
@@ -675,86 +676,49 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
request_id: RequestId,
|
request_id: RequestId,
|
||||||
req: BlocksByRangeRequest,
|
req: BlocksByRangeRequest,
|
||||||
) -> Result<(), (RpcErrorResponse, &'static str)> {
|
) -> Result<(), (RpcErrorResponse, &'static str)> {
|
||||||
|
let req_start_slot = *req.start_slot();
|
||||||
|
let req_count = *req.count();
|
||||||
|
|
||||||
debug!(self.log, "Received BlocksByRange Request";
|
debug!(self.log, "Received BlocksByRange Request";
|
||||||
"peer_id" => %peer_id,
|
"peer_id" => %peer_id,
|
||||||
"count" => req.count(),
|
"start_slot" => req_start_slot,
|
||||||
"start_slot" => req.start_slot(),
|
"count" => req_count,
|
||||||
);
|
);
|
||||||
|
|
||||||
let forwards_block_root_iter = match self
|
// Spawn a blocking handle since get_block_roots_for_slot_range takes a sync lock on the
|
||||||
.chain
|
// fork-choice.
|
||||||
.forwards_iter_block_roots(Slot::from(*req.start_slot()))
|
let network_beacon_processor = self.clone();
|
||||||
{
|
let block_roots = self
|
||||||
Ok(iter) => iter,
|
.executor
|
||||||
Err(BeaconChainError::HistoricalBlockOutOfRange {
|
.spawn_blocking_handle(
|
||||||
slot,
|
move || {
|
||||||
oldest_block_slot,
|
network_beacon_processor.get_block_roots_for_slot_range(
|
||||||
}) => {
|
req_start_slot,
|
||||||
debug!(self.log, "Range request failed during backfill";
|
req_count,
|
||||||
"requested_slot" => slot,
|
"BlocksByRange",
|
||||||
"oldest_known_slot" => oldest_block_slot
|
)
|
||||||
);
|
},
|
||||||
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
|
"get_block_roots_for_slot_range",
|
||||||
}
|
)
|
||||||
Err(e) => {
|
.ok_or((RpcErrorResponse::ServerError, "shutting down"))?
|
||||||
error!(self.log, "Unable to obtain root iter";
|
.await
|
||||||
"request" => ?req,
|
.map_err(|_| (RpcErrorResponse::ServerError, "tokio join"))??;
|
||||||
"peer" => %peer_id,
|
|
||||||
"error" => ?e
|
|
||||||
);
|
|
||||||
return Err((RpcErrorResponse::ServerError, "Database error"));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Pick out the required blocks, ignoring skip-slots.
|
|
||||||
let mut last_block_root = None;
|
|
||||||
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
|
|
||||||
iter.take_while(|(_, slot)| {
|
|
||||||
slot.as_u64() < req.start_slot().saturating_add(*req.count())
|
|
||||||
})
|
|
||||||
// map skip slots to None
|
|
||||||
.map(|(root, _)| {
|
|
||||||
let result = if Some(root) == last_block_root {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(root)
|
|
||||||
};
|
|
||||||
last_block_root = Some(root);
|
|
||||||
result
|
|
||||||
})
|
|
||||||
.collect::<Vec<Option<Hash256>>>()
|
|
||||||
});
|
|
||||||
|
|
||||||
let block_roots = match maybe_block_roots {
|
|
||||||
Ok(block_roots) => block_roots,
|
|
||||||
Err(e) => {
|
|
||||||
error!(self.log, "Error during iteration over blocks";
|
|
||||||
"request" => ?req,
|
|
||||||
"peer" => %peer_id,
|
|
||||||
"error" => ?e
|
|
||||||
);
|
|
||||||
return Err((RpcErrorResponse::ServerError, "Iteration error"));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// remove all skip slots
|
|
||||||
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let current_slot = self
|
let current_slot = self
|
||||||
.chain
|
.chain
|
||||||
.slot()
|
.slot()
|
||||||
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
|
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
|
||||||
|
|
||||||
let log_results = |req: BlocksByRangeRequest, peer_id, blocks_sent| {
|
let log_results = |peer_id, blocks_sent| {
|
||||||
if blocks_sent < (*req.count() as usize) {
|
if blocks_sent < (req_count as usize) {
|
||||||
debug!(
|
debug!(
|
||||||
self.log,
|
self.log,
|
||||||
"BlocksByRange outgoing response processed";
|
"BlocksByRange outgoing response processed";
|
||||||
"peer" => %peer_id,
|
"peer" => %peer_id,
|
||||||
"msg" => "Failed to return all requested blocks",
|
"msg" => "Failed to return all requested blocks",
|
||||||
"start_slot" => req.start_slot(),
|
"start_slot" => req_start_slot,
|
||||||
"current_slot" => current_slot,
|
"current_slot" => current_slot,
|
||||||
"requested" => req.count(),
|
"requested" => req_count,
|
||||||
"returned" => blocks_sent
|
"returned" => blocks_sent
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
@@ -762,9 +726,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
self.log,
|
self.log,
|
||||||
"BlocksByRange outgoing response processed";
|
"BlocksByRange outgoing response processed";
|
||||||
"peer" => %peer_id,
|
"peer" => %peer_id,
|
||||||
"start_slot" => req.start_slot(),
|
"start_slot" => req_start_slot,
|
||||||
"current_slot" => current_slot,
|
"current_slot" => current_slot,
|
||||||
"requested" => req.count(),
|
"requested" => req_count,
|
||||||
"returned" => blocks_sent
|
"returned" => blocks_sent
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -785,8 +749,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
Ok(Some(block)) => {
|
Ok(Some(block)) => {
|
||||||
// Due to skip slots, blocks could be out of the range, we ensure they
|
// Due to skip slots, blocks could be out of the range, we ensure they
|
||||||
// are in the range before sending
|
// are in the range before sending
|
||||||
if block.slot() >= *req.start_slot()
|
if block.slot() >= req_start_slot && block.slot() < req_start_slot + req.count()
|
||||||
&& block.slot() < req.start_slot() + req.count()
|
|
||||||
{
|
{
|
||||||
blocks_sent += 1;
|
blocks_sent += 1;
|
||||||
self.send_network_message(NetworkMessage::SendResponse {
|
self.send_network_message(NetworkMessage::SendResponse {
|
||||||
@@ -805,7 +768,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
"peer" => %peer_id,
|
"peer" => %peer_id,
|
||||||
"request_root" => ?root
|
"request_root" => ?root
|
||||||
);
|
);
|
||||||
log_results(req, peer_id, blocks_sent);
|
log_results(peer_id, blocks_sent);
|
||||||
return Err((RpcErrorResponse::ServerError, "Database inconsistency"));
|
return Err((RpcErrorResponse::ServerError, "Database inconsistency"));
|
||||||
}
|
}
|
||||||
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
|
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
|
||||||
@@ -815,7 +778,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
"block_root" => ?root,
|
"block_root" => ?root,
|
||||||
"reason" => "execution layer not synced",
|
"reason" => "execution layer not synced",
|
||||||
);
|
);
|
||||||
log_results(req, peer_id, blocks_sent);
|
log_results(peer_id, blocks_sent);
|
||||||
// send the stream terminator
|
// send the stream terminator
|
||||||
return Err((
|
return Err((
|
||||||
RpcErrorResponse::ResourceUnavailable,
|
RpcErrorResponse::ResourceUnavailable,
|
||||||
@@ -843,17 +806,142 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
"error" => ?e
|
"error" => ?e
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
log_results(req, peer_id, blocks_sent);
|
log_results(peer_id, blocks_sent);
|
||||||
// send the stream terminator
|
// send the stream terminator
|
||||||
return Err((RpcErrorResponse::ServerError, "Failed fetching blocks"));
|
return Err((RpcErrorResponse::ServerError, "Failed fetching blocks"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log_results(req, peer_id, blocks_sent);
|
log_results(peer_id, blocks_sent);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_block_roots_for_slot_range(
|
||||||
|
&self,
|
||||||
|
req_start_slot: u64,
|
||||||
|
req_count: u64,
|
||||||
|
req_type: &str,
|
||||||
|
) -> Result<Vec<Hash256>, (RpcErrorResponse, &'static str)> {
|
||||||
|
let start_time = std::time::Instant::now();
|
||||||
|
let finalized_slot = self
|
||||||
|
.chain
|
||||||
|
.canonical_head
|
||||||
|
.cached_head()
|
||||||
|
.finalized_checkpoint()
|
||||||
|
.epoch
|
||||||
|
.start_slot(T::EthSpec::slots_per_epoch());
|
||||||
|
|
||||||
|
let (block_roots, source) = if req_start_slot >= finalized_slot.as_u64() {
|
||||||
|
// If the entire requested range is after finalization, use fork_choice
|
||||||
|
(
|
||||||
|
self.chain
|
||||||
|
.block_roots_from_fork_choice(req_start_slot, req_count),
|
||||||
|
"fork_choice",
|
||||||
|
)
|
||||||
|
} else if req_start_slot + req_count <= finalized_slot.as_u64() {
|
||||||
|
// If the entire requested range is before finalization, use store
|
||||||
|
(
|
||||||
|
self.get_block_roots_from_store(req_start_slot, req_count)?,
|
||||||
|
"store",
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// Split the request at the finalization boundary
|
||||||
|
let count_from_store = finalized_slot.as_u64() - req_start_slot;
|
||||||
|
let count_from_fork_choice = req_count - count_from_store;
|
||||||
|
let start_slot_fork_choice = finalized_slot.as_u64();
|
||||||
|
|
||||||
|
// Get roots from store (up to and including finalized slot)
|
||||||
|
let mut roots_from_store =
|
||||||
|
self.get_block_roots_from_store(req_start_slot, count_from_store)?;
|
||||||
|
|
||||||
|
// Get roots from fork choice (after finalized slot)
|
||||||
|
let roots_from_fork_choice = self
|
||||||
|
.chain
|
||||||
|
.block_roots_from_fork_choice(start_slot_fork_choice, count_from_fork_choice);
|
||||||
|
|
||||||
|
roots_from_store.extend(roots_from_fork_choice);
|
||||||
|
|
||||||
|
(roots_from_store, "mixed")
|
||||||
|
};
|
||||||
|
|
||||||
|
let elapsed = start_time.elapsed();
|
||||||
|
metrics::observe_timer_vec(
|
||||||
|
&metrics::BEACON_PROCESSOR_GET_BLOCK_ROOTS_TIME,
|
||||||
|
&[source],
|
||||||
|
elapsed,
|
||||||
|
);
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Range request block roots retrieved";
|
||||||
|
"req_type" => req_type,
|
||||||
|
"start_slot" => req_start_slot,
|
||||||
|
"req_count" => req_count,
|
||||||
|
"roots_count" => block_roots.len(),
|
||||||
|
"source" => source,
|
||||||
|
"elapsed" => ?elapsed,
|
||||||
|
"finalized_slot" => finalized_slot
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(block_roots)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator.
|
||||||
|
fn get_block_roots_from_store(
|
||||||
|
&self,
|
||||||
|
start_slot: u64,
|
||||||
|
count: u64,
|
||||||
|
) -> Result<Vec<Hash256>, (RpcErrorResponse, &'static str)> {
|
||||||
|
let forwards_block_root_iter =
|
||||||
|
match self.chain.forwards_iter_block_roots(Slot::from(start_slot)) {
|
||||||
|
Ok(iter) => iter,
|
||||||
|
Err(BeaconChainError::HistoricalBlockOutOfRange {
|
||||||
|
slot,
|
||||||
|
oldest_block_slot,
|
||||||
|
}) => {
|
||||||
|
debug!(self.log, "Range request failed during backfill";
|
||||||
|
"requested_slot" => slot,
|
||||||
|
"oldest_known_slot" => oldest_block_slot
|
||||||
|
);
|
||||||
|
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(self.log, "Unable to obtain root iter for range request";
|
||||||
|
"start_slot" => start_slot,
|
||||||
|
"count" => count,
|
||||||
|
"error" => ?e
|
||||||
|
);
|
||||||
|
return Err((RpcErrorResponse::ServerError, "Database error"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Pick out the required blocks, ignoring skip-slots.
|
||||||
|
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
|
||||||
|
iter.take_while(|(_, slot)| slot.as_u64() < start_slot.saturating_add(count))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
});
|
||||||
|
|
||||||
|
let block_roots = match maybe_block_roots {
|
||||||
|
Ok(block_roots) => block_roots,
|
||||||
|
Err(e) => {
|
||||||
|
error!(self.log, "Error during iteration over blocks for range request";
|
||||||
|
"start_slot" => start_slot,
|
||||||
|
"count" => count,
|
||||||
|
"error" => ?e
|
||||||
|
);
|
||||||
|
return Err((RpcErrorResponse::ServerError, "Iteration error"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// remove all skip slots i.e. duplicated roots
|
||||||
|
Ok(block_roots
|
||||||
|
.into_iter()
|
||||||
|
.map(|(root, _)| root)
|
||||||
|
.unique()
|
||||||
|
.collect::<Vec<_>>())
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle a `BlobsByRange` request from the peer.
|
/// Handle a `BlobsByRange` request from the peer.
|
||||||
pub fn handle_blobs_by_range_request(
|
pub fn handle_blobs_by_range_request(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
@@ -932,65 +1020,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
let forwards_block_root_iter =
|
let block_roots =
|
||||||
match self.chain.forwards_iter_block_roots(request_start_slot) {
|
self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?;
|
||||||
Ok(iter) => iter,
|
|
||||||
Err(BeaconChainError::HistoricalBlockOutOfRange {
|
|
||||||
slot,
|
|
||||||
oldest_block_slot,
|
|
||||||
}) => {
|
|
||||||
debug!(self.log, "Range request failed during backfill";
|
|
||||||
"requested_slot" => slot,
|
|
||||||
"oldest_known_slot" => oldest_block_slot
|
|
||||||
);
|
|
||||||
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!(self.log, "Unable to obtain root iter";
|
|
||||||
"request" => ?req,
|
|
||||||
"peer" => %peer_id,
|
|
||||||
"error" => ?e
|
|
||||||
);
|
|
||||||
return Err((RpcErrorResponse::ServerError, "Database error"));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Use `WhenSlotSkipped::Prev` to get the most recent block root prior to
|
|
||||||
// `request_start_slot` in order to check whether the `request_start_slot` is a skip.
|
|
||||||
let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| {
|
|
||||||
self.chain
|
|
||||||
.block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev)
|
|
||||||
.ok()
|
|
||||||
.flatten()
|
|
||||||
});
|
|
||||||
|
|
||||||
// Pick out the required blocks, ignoring skip-slots.
|
|
||||||
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
|
|
||||||
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
|
|
||||||
// map skip slots to None
|
|
||||||
.map(|(root, _)| {
|
|
||||||
let result = if Some(root) == last_block_root {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(root)
|
|
||||||
};
|
|
||||||
last_block_root = Some(root);
|
|
||||||
result
|
|
||||||
})
|
|
||||||
.collect::<Vec<Option<Hash256>>>()
|
|
||||||
});
|
|
||||||
|
|
||||||
let block_roots = match maybe_block_roots {
|
|
||||||
Ok(block_roots) => block_roots,
|
|
||||||
Err(e) => {
|
|
||||||
error!(self.log, "Error during iteration over blocks";
|
|
||||||
"request" => ?req,
|
|
||||||
"peer" => %peer_id,
|
|
||||||
"error" => ?e
|
|
||||||
);
|
|
||||||
return Err((RpcErrorResponse::ServerError, "Database error"));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let current_slot = self
|
let current_slot = self
|
||||||
.chain
|
.chain
|
||||||
@@ -1009,8 +1040,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
// remove all skip slots
|
|
||||||
let block_roots = block_roots.into_iter().flatten();
|
|
||||||
let mut blobs_sent = 0;
|
let mut blobs_sent = 0;
|
||||||
|
|
||||||
for root in block_roots {
|
for root in block_roots {
|
||||||
@@ -1136,68 +1165,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
let forwards_block_root_iter =
|
let block_roots =
|
||||||
match self.chain.forwards_iter_block_roots(request_start_slot) {
|
self.get_block_roots_for_slot_range(req.start_slot, req.count, "DataColumnsByRange")?;
|
||||||
Ok(iter) => iter,
|
|
||||||
Err(BeaconChainError::HistoricalBlockOutOfRange {
|
|
||||||
slot,
|
|
||||||
oldest_block_slot,
|
|
||||||
}) => {
|
|
||||||
debug!(self.log, "Range request failed during backfill";
|
|
||||||
"requested_slot" => slot,
|
|
||||||
"oldest_known_slot" => oldest_block_slot
|
|
||||||
);
|
|
||||||
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!(self.log, "Unable to obtain root iter";
|
|
||||||
"request" => ?req,
|
|
||||||
"peer" => %peer_id,
|
|
||||||
"error" => ?e
|
|
||||||
);
|
|
||||||
return Err((RpcErrorResponse::ServerError, "Database error"));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Use `WhenSlotSkipped::Prev` to get the most recent block root prior to
|
|
||||||
// `request_start_slot` in order to check whether the `request_start_slot` is a skip.
|
|
||||||
let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| {
|
|
||||||
self.chain
|
|
||||||
.block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev)
|
|
||||||
.ok()
|
|
||||||
.flatten()
|
|
||||||
});
|
|
||||||
|
|
||||||
// Pick out the required blocks, ignoring skip-slots.
|
|
||||||
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
|
|
||||||
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
|
|
||||||
// map skip slots to None
|
|
||||||
.map(|(root, _)| {
|
|
||||||
let result = if Some(root) == last_block_root {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(root)
|
|
||||||
};
|
|
||||||
last_block_root = Some(root);
|
|
||||||
result
|
|
||||||
})
|
|
||||||
.collect::<Vec<Option<Hash256>>>()
|
|
||||||
});
|
|
||||||
|
|
||||||
let block_roots = match maybe_block_roots {
|
|
||||||
Ok(block_roots) => block_roots,
|
|
||||||
Err(e) => {
|
|
||||||
error!(self.log, "Error during iteration over blocks";
|
|
||||||
"request" => ?req,
|
|
||||||
"peer" => %peer_id,
|
|
||||||
"error" => ?e
|
|
||||||
);
|
|
||||||
return Err((RpcErrorResponse::ServerError, "Database error"));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// remove all skip slots
|
|
||||||
let block_roots = block_roots.into_iter().flatten();
|
|
||||||
let mut data_columns_sent = 0;
|
let mut data_columns_sent = 0;
|
||||||
|
|
||||||
for root in block_roots {
|
for root in block_roots {
|
||||||
|
|||||||
@@ -856,10 +856,18 @@ impl ProtoArrayForkChoice {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// See `ProtoArray::iter_nodes`
|
/// See `ProtoArray::iter_nodes`
|
||||||
pub fn iter_nodes<'a>(&'a self, block_root: &Hash256) -> Iter<'a> {
|
pub fn iter_nodes(&self, block_root: &Hash256) -> Iter {
|
||||||
self.proto_array.iter_nodes(block_root)
|
self.proto_array.iter_nodes(block_root)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// See `ProtoArray::iter_block_roots`
|
||||||
|
pub fn iter_block_roots(
|
||||||
|
&self,
|
||||||
|
block_root: &Hash256,
|
||||||
|
) -> impl Iterator<Item = (Hash256, Slot)> + use<'_> {
|
||||||
|
self.proto_array.iter_block_roots(block_root)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn as_bytes(&self) -> Vec<u8> {
|
pub fn as_bytes(&self) -> Vec<u8> {
|
||||||
SszContainer::from(self).as_ssz_bytes()
|
SszContainer::from(self).as_ssz_bytes()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user