mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-06 10:11:44 +00:00
Merge branch 'release-v7.0.0' into unstable
This commit is contained in:
@@ -88,6 +88,15 @@ pub static BEACON_PROCESSOR_IMPORT_ERRORS_PER_TYPE: LazyLock<Result<IntCounterVe
|
||||
&["source", "component", "type"],
|
||||
)
|
||||
});
|
||||
pub static BEACON_PROCESSOR_GET_BLOCK_ROOTS_TIME: LazyLock<Result<HistogramVec>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram_vec_with_buckets(
|
||||
"beacon_processor_get_block_roots_time_seconds",
|
||||
"Time to complete get_block_roots when serving by_range requests",
|
||||
decimal_buckets(-3, -1),
|
||||
&["source"],
|
||||
)
|
||||
});
|
||||
|
||||
/*
|
||||
* Gossip processor
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
use crate::metrics;
|
||||
use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE};
|
||||
use crate::service::NetworkMessage;
|
||||
use crate::status::ToStatusMessage;
|
||||
use crate::sync::SyncMessage;
|
||||
use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped};
|
||||
use itertools::process_results;
|
||||
use itertools::{process_results, Itertools};
|
||||
use lighthouse_network::rpc::methods::{
|
||||
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
|
||||
};
|
||||
@@ -588,97 +589,57 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
inbound_request_id: InboundRequestId,
|
||||
req: BlocksByRangeRequest,
|
||||
) -> Result<(), (RpcErrorResponse, &'static str)> {
|
||||
let req_start_slot = *req.start_slot();
|
||||
let req_count = *req.count();
|
||||
|
||||
debug!(
|
||||
%peer_id,
|
||||
count = req.count(),
|
||||
start_slot = %req.start_slot(),
|
||||
count = req_count,
|
||||
start_slot = %req_start_slot,
|
||||
"Received BlocksByRange Request"
|
||||
);
|
||||
|
||||
let forwards_block_root_iter = match self
|
||||
.chain
|
||||
.forwards_iter_block_roots(Slot::from(*req.start_slot()))
|
||||
{
|
||||
Ok(iter) => iter,
|
||||
Err(BeaconChainError::HistoricalBlockOutOfRange {
|
||||
slot,
|
||||
oldest_block_slot,
|
||||
}) => {
|
||||
debug!(
|
||||
requested_slot = %slot,
|
||||
oldest_known_slot = %oldest_block_slot,
|
||||
"Range request failed during backfill"
|
||||
);
|
||||
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
request = ?req,
|
||||
%peer_id,
|
||||
error = ?e,
|
||||
"Unable to obtain root iter"
|
||||
);
|
||||
return Err((RpcErrorResponse::ServerError, "Database error"));
|
||||
}
|
||||
};
|
||||
|
||||
// Pick out the required blocks, ignoring skip-slots.
|
||||
let mut last_block_root = None;
|
||||
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
|
||||
iter.take_while(|(_, slot)| {
|
||||
slot.as_u64() < req.start_slot().saturating_add(*req.count())
|
||||
})
|
||||
// map skip slots to None
|
||||
.map(|(root, _)| {
|
||||
let result = if Some(root) == last_block_root {
|
||||
None
|
||||
} else {
|
||||
Some(root)
|
||||
};
|
||||
last_block_root = Some(root);
|
||||
result
|
||||
})
|
||||
.collect::<Vec<Option<Hash256>>>()
|
||||
});
|
||||
|
||||
let block_roots = match maybe_block_roots {
|
||||
Ok(block_roots) => block_roots,
|
||||
Err(e) => {
|
||||
error!(
|
||||
request = ?req,
|
||||
%peer_id,
|
||||
error = ?e,
|
||||
"Error during iteration over blocks"
|
||||
);
|
||||
return Err((RpcErrorResponse::ServerError, "Iteration error"));
|
||||
}
|
||||
};
|
||||
|
||||
// remove all skip slots
|
||||
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
|
||||
// Spawn a blocking handle since get_block_roots_for_slot_range takes a sync lock on the
|
||||
// fork-choice.
|
||||
let network_beacon_processor = self.clone();
|
||||
let block_roots = self
|
||||
.executor
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
network_beacon_processor.get_block_roots_for_slot_range(
|
||||
req_start_slot,
|
||||
req_count,
|
||||
"BlocksByRange",
|
||||
)
|
||||
},
|
||||
"get_block_roots_for_slot_range",
|
||||
)
|
||||
.ok_or((RpcErrorResponse::ServerError, "shutting down"))?
|
||||
.await
|
||||
.map_err(|_| (RpcErrorResponse::ServerError, "tokio join"))??;
|
||||
|
||||
let current_slot = self
|
||||
.chain
|
||||
.slot()
|
||||
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
|
||||
|
||||
let log_results = |req: BlocksByRangeRequest, peer_id, blocks_sent| {
|
||||
if blocks_sent < (*req.count() as usize) {
|
||||
let log_results = |peer_id, blocks_sent| {
|
||||
if blocks_sent < (req_count as usize) {
|
||||
debug!(
|
||||
%peer_id,
|
||||
msg = "Failed to return all requested blocks",
|
||||
start_slot = %req.start_slot(),
|
||||
start_slot = %req_start_slot,
|
||||
%current_slot,
|
||||
requested = req.count(),
|
||||
requested = req_count,
|
||||
returned = blocks_sent,
|
||||
"BlocksByRange outgoing response processed"
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
%peer_id,
|
||||
start_slot = %req.start_slot(),
|
||||
start_slot = %req_start_slot,
|
||||
%current_slot,
|
||||
requested = req.count(),
|
||||
requested = req_count,
|
||||
returned = blocks_sent,
|
||||
"BlocksByRange outgoing response processed"
|
||||
);
|
||||
@@ -700,8 +661,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
Ok(Some(block)) => {
|
||||
// Due to skip slots, blocks could be out of the range, we ensure they
|
||||
// are in the range before sending
|
||||
if block.slot() >= *req.start_slot()
|
||||
&& block.slot() < req.start_slot() + req.count()
|
||||
if block.slot() >= req_start_slot && block.slot() < req_start_slot + req.count()
|
||||
{
|
||||
blocks_sent += 1;
|
||||
self.send_network_message(NetworkMessage::SendResponse {
|
||||
@@ -718,7 +678,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
request_root = ?root,
|
||||
"Block in the chain is not in the store"
|
||||
);
|
||||
log_results(req, peer_id, blocks_sent);
|
||||
log_results(peer_id, blocks_sent);
|
||||
return Err((RpcErrorResponse::ServerError, "Database inconsistency"));
|
||||
}
|
||||
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
|
||||
@@ -727,7 +687,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
reason = "execution layer not synced",
|
||||
"Failed to fetch execution payload for blocks by range request"
|
||||
);
|
||||
log_results(req, peer_id, blocks_sent);
|
||||
log_results(peer_id, blocks_sent);
|
||||
// send the stream terminator
|
||||
return Err((
|
||||
RpcErrorResponse::ResourceUnavailable,
|
||||
@@ -753,17 +713,144 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
"Error fetching block for peer"
|
||||
);
|
||||
}
|
||||
log_results(req, peer_id, blocks_sent);
|
||||
log_results(peer_id, blocks_sent);
|
||||
// send the stream terminator
|
||||
return Err((RpcErrorResponse::ServerError, "Failed fetching blocks"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log_results(req, peer_id, blocks_sent);
|
||||
log_results(peer_id, blocks_sent);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_block_roots_for_slot_range(
|
||||
&self,
|
||||
req_start_slot: u64,
|
||||
req_count: u64,
|
||||
req_type: &str,
|
||||
) -> Result<Vec<Hash256>, (RpcErrorResponse, &'static str)> {
|
||||
let start_time = std::time::Instant::now();
|
||||
let finalized_slot = self
|
||||
.chain
|
||||
.canonical_head
|
||||
.cached_head()
|
||||
.finalized_checkpoint()
|
||||
.epoch
|
||||
.start_slot(T::EthSpec::slots_per_epoch());
|
||||
|
||||
let (block_roots, source) = if req_start_slot >= finalized_slot.as_u64() {
|
||||
// If the entire requested range is after finalization, use fork_choice
|
||||
(
|
||||
self.chain
|
||||
.block_roots_from_fork_choice(req_start_slot, req_count),
|
||||
"fork_choice",
|
||||
)
|
||||
} else if req_start_slot + req_count <= finalized_slot.as_u64() {
|
||||
// If the entire requested range is before finalization, use store
|
||||
(
|
||||
self.get_block_roots_from_store(req_start_slot, req_count)?,
|
||||
"store",
|
||||
)
|
||||
} else {
|
||||
// Split the request at the finalization boundary
|
||||
let count_from_store = finalized_slot.as_u64() - req_start_slot;
|
||||
let count_from_fork_choice = req_count - count_from_store;
|
||||
let start_slot_fork_choice = finalized_slot.as_u64();
|
||||
|
||||
// Get roots from store (up to and including finalized slot)
|
||||
let mut roots_from_store =
|
||||
self.get_block_roots_from_store(req_start_slot, count_from_store)?;
|
||||
|
||||
// Get roots from fork choice (after finalized slot)
|
||||
let roots_from_fork_choice = self
|
||||
.chain
|
||||
.block_roots_from_fork_choice(start_slot_fork_choice, count_from_fork_choice);
|
||||
|
||||
roots_from_store.extend(roots_from_fork_choice);
|
||||
|
||||
(roots_from_store, "mixed")
|
||||
};
|
||||
|
||||
let elapsed = start_time.elapsed();
|
||||
metrics::observe_timer_vec(
|
||||
&metrics::BEACON_PROCESSOR_GET_BLOCK_ROOTS_TIME,
|
||||
&[source],
|
||||
elapsed,
|
||||
);
|
||||
|
||||
debug!(
|
||||
req_type,
|
||||
start_slot = %req_start_slot,
|
||||
req_count,
|
||||
roots_count = block_roots.len(),
|
||||
source,
|
||||
elapsed = ?elapsed,
|
||||
%finalized_slot,
|
||||
"Range request block roots retrieved"
|
||||
);
|
||||
|
||||
Ok(block_roots)
|
||||
}
|
||||
|
||||
/// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator.
|
||||
fn get_block_roots_from_store(
|
||||
&self,
|
||||
start_slot: u64,
|
||||
count: u64,
|
||||
) -> Result<Vec<Hash256>, (RpcErrorResponse, &'static str)> {
|
||||
let forwards_block_root_iter =
|
||||
match self.chain.forwards_iter_block_roots(Slot::from(start_slot)) {
|
||||
Ok(iter) => iter,
|
||||
Err(BeaconChainError::HistoricalBlockOutOfRange {
|
||||
slot,
|
||||
oldest_block_slot,
|
||||
}) => {
|
||||
debug!(
|
||||
requested_slot = %slot,
|
||||
oldest_known_slot = %oldest_block_slot,
|
||||
"Range request failed during backfill"
|
||||
);
|
||||
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
%start_slot,
|
||||
count,
|
||||
error = ?e,
|
||||
"Unable to obtain root iter for range request"
|
||||
);
|
||||
return Err((RpcErrorResponse::ServerError, "Database error"));
|
||||
}
|
||||
};
|
||||
|
||||
// Pick out the required blocks, ignoring skip-slots.
|
||||
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
|
||||
iter.take_while(|(_, slot)| slot.as_u64() < start_slot.saturating_add(count))
|
||||
.collect::<Vec<_>>()
|
||||
});
|
||||
|
||||
let block_roots = match maybe_block_roots {
|
||||
Ok(block_roots) => block_roots,
|
||||
Err(e) => {
|
||||
error!(
|
||||
%start_slot,
|
||||
count,
|
||||
error = ?e,
|
||||
"Error during iteration over blocks for range request"
|
||||
);
|
||||
return Err((RpcErrorResponse::ServerError, "Iteration error"));
|
||||
}
|
||||
};
|
||||
|
||||
// remove all skip slots i.e. duplicated roots
|
||||
Ok(block_roots
|
||||
.into_iter()
|
||||
.map(|(root, _)| root)
|
||||
.unique()
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
/// Handle a `BlobsByRange` request from the peer.
|
||||
pub fn handle_blobs_by_range_request(
|
||||
self: Arc<Self>,
|
||||
@@ -830,68 +917,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
};
|
||||
}
|
||||
|
||||
let forwards_block_root_iter =
|
||||
match self.chain.forwards_iter_block_roots(request_start_slot) {
|
||||
Ok(iter) => iter,
|
||||
Err(BeaconChainError::HistoricalBlockOutOfRange {
|
||||
slot,
|
||||
oldest_block_slot,
|
||||
}) => {
|
||||
debug!(
|
||||
requested_slot = %slot,
|
||||
oldest_known_slot = %oldest_block_slot,
|
||||
"Range request failed during backfill"
|
||||
);
|
||||
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
request = ?req,
|
||||
%peer_id,
|
||||
error = ?e,
|
||||
"Unable to obtain root iter"
|
||||
);
|
||||
return Err((RpcErrorResponse::ServerError, "Database error"));
|
||||
}
|
||||
};
|
||||
|
||||
// Use `WhenSlotSkipped::Prev` to get the most recent block root prior to
|
||||
// `request_start_slot` in order to check whether the `request_start_slot` is a skip.
|
||||
let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| {
|
||||
self.chain
|
||||
.block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev)
|
||||
.ok()
|
||||
.flatten()
|
||||
});
|
||||
|
||||
// Pick out the required blocks, ignoring skip-slots.
|
||||
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
|
||||
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
|
||||
// map skip slots to None
|
||||
.map(|(root, _)| {
|
||||
let result = if Some(root) == last_block_root {
|
||||
None
|
||||
} else {
|
||||
Some(root)
|
||||
};
|
||||
last_block_root = Some(root);
|
||||
result
|
||||
})
|
||||
.collect::<Vec<Option<Hash256>>>()
|
||||
});
|
||||
|
||||
let block_roots = match maybe_block_roots {
|
||||
Ok(block_roots) => block_roots,
|
||||
Err(e) => {
|
||||
error!(
|
||||
request = ?req,
|
||||
%peer_id,
|
||||
error = ?e,
|
||||
"Error during iteration over blocks"
|
||||
);
|
||||
return Err((RpcErrorResponse::ServerError, "Database error"));
|
||||
}
|
||||
};
|
||||
let block_roots =
|
||||
self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?;
|
||||
|
||||
let current_slot = self
|
||||
.chain
|
||||
@@ -909,8 +936,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
);
|
||||
};
|
||||
|
||||
// remove all skip slots
|
||||
let block_roots = block_roots.into_iter().flatten();
|
||||
let mut blobs_sent = 0;
|
||||
|
||||
for root in block_roots {
|
||||
@@ -1022,71 +1047,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
};
|
||||
}
|
||||
|
||||
let forwards_block_root_iter =
|
||||
match self.chain.forwards_iter_block_roots(request_start_slot) {
|
||||
Ok(iter) => iter,
|
||||
Err(BeaconChainError::HistoricalBlockOutOfRange {
|
||||
slot,
|
||||
oldest_block_slot,
|
||||
}) => {
|
||||
debug!(
|
||||
requested_slot = %slot,
|
||||
oldest_known_slot = %oldest_block_slot,
|
||||
"Range request failed during backfill"
|
||||
);
|
||||
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
request = ?req,
|
||||
%peer_id,
|
||||
error = ?e,
|
||||
"Unable to obtain root iter"
|
||||
);
|
||||
return Err((RpcErrorResponse::ServerError, "Database error"));
|
||||
}
|
||||
};
|
||||
|
||||
// Use `WhenSlotSkipped::Prev` to get the most recent block root prior to
|
||||
// `request_start_slot` in order to check whether the `request_start_slot` is a skip.
|
||||
let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| {
|
||||
self.chain
|
||||
.block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev)
|
||||
.ok()
|
||||
.flatten()
|
||||
});
|
||||
|
||||
// Pick out the required blocks, ignoring skip-slots.
|
||||
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
|
||||
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
|
||||
// map skip slots to None
|
||||
.map(|(root, _)| {
|
||||
let result = if Some(root) == last_block_root {
|
||||
None
|
||||
} else {
|
||||
Some(root)
|
||||
};
|
||||
last_block_root = Some(root);
|
||||
result
|
||||
})
|
||||
.collect::<Vec<Option<Hash256>>>()
|
||||
});
|
||||
|
||||
let block_roots = match maybe_block_roots {
|
||||
Ok(block_roots) => block_roots,
|
||||
Err(e) => {
|
||||
error!(
|
||||
request = ?req,
|
||||
%peer_id,
|
||||
error = ?e,
|
||||
"Error during iteration over blocks"
|
||||
);
|
||||
return Err((RpcErrorResponse::ServerError, "Database error"));
|
||||
}
|
||||
};
|
||||
|
||||
// remove all skip slots
|
||||
let block_roots = block_roots.into_iter().flatten();
|
||||
let block_roots =
|
||||
self.get_block_roots_for_slot_range(req.start_slot, req.count, "DataColumnsByRange")?;
|
||||
let mut data_columns_sent = 0;
|
||||
|
||||
for root in block_roots {
|
||||
|
||||
Reference in New Issue
Block a user