mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-16 20:39:10 +00:00
Allow custody by root requests to have no peers (#6417)
* Allow custody by root requests to have no peers
This commit is contained in:
@@ -35,7 +35,9 @@
|
|||||||
|
|
||||||
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
|
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
|
||||||
use super::block_lookups::BlockLookups;
|
use super::block_lookups::BlockLookups;
|
||||||
use super::network_context::{BlockOrBlob, RangeRequestId, RpcEvent, SyncNetworkContext};
|
use super::network_context::{
|
||||||
|
BlockOrBlob, CustodyByRootResult, RangeRequestId, RpcEvent, SyncNetworkContext,
|
||||||
|
};
|
||||||
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
|
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
|
||||||
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
|
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
|
||||||
use super::sampling::{Sampling, SamplingConfig, SamplingResult};
|
use super::sampling::{Sampling, SamplingConfig, SamplingResult};
|
||||||
@@ -55,8 +57,8 @@ use beacon_chain::{
|
|||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use lighthouse_network::rpc::RPCError;
|
use lighthouse_network::rpc::RPCError;
|
||||||
use lighthouse_network::service::api_types::{
|
use lighthouse_network::service::api_types::{
|
||||||
DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SamplingId, SamplingRequester,
|
CustodyRequester, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SamplingId,
|
||||||
SingleLookupReqId, SyncRequestId,
|
SamplingRequester, SingleLookupReqId, SyncRequestId,
|
||||||
};
|
};
|
||||||
use lighthouse_network::types::{NetworkGlobals, SyncState};
|
use lighthouse_network::types::{NetworkGlobals, SyncState};
|
||||||
use lighthouse_network::SyncInfo;
|
use lighthouse_network::SyncInfo;
|
||||||
@@ -368,6 +370,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.update_sync_state();
|
self.update_sync_state();
|
||||||
|
|
||||||
|
// Try to make progress on custody requests that are waiting for peers
|
||||||
|
for (id, result) in self.network.continue_custody_by_root_requests() {
|
||||||
|
self.on_custody_by_root_result(id, result);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handles RPC errors related to requests that were emitted from the sync manager.
|
/// Handles RPC errors related to requests that were emitted from the sync manager.
|
||||||
@@ -444,6 +451,16 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
self.update_sync_state();
|
self.update_sync_state();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Prune stale requests that are waiting for peers
|
||||||
|
fn prune_requests(&mut self) {
|
||||||
|
// continue_custody_by_root_requests attempts to make progress on all requests. If some
|
||||||
|
// exceed the stale duration limit they will fail and return a result. Re-using
|
||||||
|
// `continue_custody_by_root_requests` is just a convenience to have less code.
|
||||||
|
for (id, result) in self.network.continue_custody_by_root_requests() {
|
||||||
|
self.on_custody_by_root_result(id, result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Updates the syncing state of a peer.
|
/// Updates the syncing state of a peer.
|
||||||
/// Return whether the peer should be used for range syncing or not, according to its
|
/// Return whether the peer should be used for range syncing or not, according to its
|
||||||
/// connection status.
|
/// connection status.
|
||||||
@@ -624,6 +641,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
// unless there is a bug.
|
// unless there is a bug.
|
||||||
let mut prune_lookups_interval = tokio::time::interval(Duration::from_secs(15));
|
let mut prune_lookups_interval = tokio::time::interval(Duration::from_secs(15));
|
||||||
|
|
||||||
|
let mut prune_requests = tokio::time::interval(Duration::from_secs(15));
|
||||||
|
|
||||||
let mut register_metrics_interval = tokio::time::interval(Duration::from_secs(5));
|
let mut register_metrics_interval = tokio::time::interval(Duration::from_secs(5));
|
||||||
|
|
||||||
// process any inbound messages
|
// process any inbound messages
|
||||||
@@ -638,6 +657,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
_ = prune_lookups_interval.tick() => {
|
_ = prune_lookups_interval.tick() => {
|
||||||
self.block_lookups.prune_lookups();
|
self.block_lookups.prune_lookups();
|
||||||
}
|
}
|
||||||
|
_ = prune_requests.tick() => {
|
||||||
|
self.prune_requests();
|
||||||
|
}
|
||||||
_ = register_metrics_interval.tick() => {
|
_ = register_metrics_interval.tick() => {
|
||||||
self.network.register_metrics();
|
self.network.register_metrics();
|
||||||
}
|
}
|
||||||
@@ -1054,26 +1076,32 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
DataColumnsByRootRequester::Custody(custody_id) => {
|
DataColumnsByRootRequester::Custody(custody_id) => {
|
||||||
if let Some(custody_columns) = self
|
if let Some(result) = self
|
||||||
.network
|
.network
|
||||||
.on_custody_by_root_response(custody_id, req_id, peer_id, resp)
|
.on_custody_by_root_response(custody_id, req_id, peer_id, resp)
|
||||||
{
|
{
|
||||||
// TODO(das): get proper timestamp
|
self.on_custody_by_root_result(custody_id.requester, result);
|
||||||
let seen_timestamp = timestamp_now();
|
|
||||||
self.block_lookups
|
|
||||||
.on_download_response::<CustodyRequestState<T::EthSpec>>(
|
|
||||||
custody_id.requester.0,
|
|
||||||
custody_columns.map(|(columns, peer_group)| {
|
|
||||||
(columns, peer_group, seen_timestamp)
|
|
||||||
}),
|
|
||||||
&mut self.network,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn on_custody_by_root_result(
|
||||||
|
&mut self,
|
||||||
|
requester: CustodyRequester,
|
||||||
|
response: CustodyByRootResult<T::EthSpec>,
|
||||||
|
) {
|
||||||
|
// TODO(das): get proper timestamp
|
||||||
|
let seen_timestamp = timestamp_now();
|
||||||
|
self.block_lookups
|
||||||
|
.on_download_response::<CustodyRequestState<T::EthSpec>>(
|
||||||
|
requester.0,
|
||||||
|
response.map(|(columns, peer_group)| (columns, peer_group, seen_timestamp)),
|
||||||
|
&mut self.network,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
fn on_sampling_result(&mut self, requester: SamplingRequester, result: SamplingResult) {
|
fn on_sampling_result(&mut self, requester: SamplingRequester, result: SamplingResult) {
|
||||||
// TODO(das): How is a consumer of sampling results?
|
// TODO(das): How is a consumer of sampling results?
|
||||||
// - Fork-choice for trailing DA
|
// - Fork-choice for trailing DA
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ use crate::sync::block_lookups::SingleLookupId;
|
|||||||
use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest;
|
use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest;
|
||||||
use beacon_chain::block_verification_types::RpcBlock;
|
use beacon_chain::block_verification_types::RpcBlock;
|
||||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState};
|
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState};
|
||||||
|
use custody::CustodyRequestResult;
|
||||||
use fnv::FnvHashMap;
|
use fnv::FnvHashMap;
|
||||||
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest};
|
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest};
|
||||||
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
|
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
|
||||||
@@ -69,6 +70,8 @@ pub enum RpcEvent<T> {
|
|||||||
|
|
||||||
pub type RpcResponseResult<T> = Result<(T, Duration), RpcResponseError>;
|
pub type RpcResponseResult<T> = Result<(T, Duration), RpcResponseError>;
|
||||||
|
|
||||||
|
pub type CustodyByRootResult<T> = Result<(DataColumnSidecarList<T>, PeerGroup), RpcResponseError>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum RpcResponseError {
|
pub enum RpcResponseError {
|
||||||
RpcError(RPCError),
|
RpcError(RPCError),
|
||||||
@@ -915,6 +918,32 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
.insert(id, (sender_id, info));
|
.insert(id, (sender_id, info));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempt to make progress on all custody_by_root requests. Some request may be stale waiting
|
||||||
|
/// for custody peers. Returns a Vec of results as zero or more requests may fail in this
|
||||||
|
/// attempt.
|
||||||
|
pub fn continue_custody_by_root_requests(
|
||||||
|
&mut self,
|
||||||
|
) -> Vec<(CustodyRequester, CustodyByRootResult<T::EthSpec>)> {
|
||||||
|
let ids = self
|
||||||
|
.custody_by_root_requests
|
||||||
|
.keys()
|
||||||
|
.copied()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
// Need to collect ids and results in separate steps to re-borrow self.
|
||||||
|
ids.into_iter()
|
||||||
|
.filter_map(|id| {
|
||||||
|
let mut request = self
|
||||||
|
.custody_by_root_requests
|
||||||
|
.remove(&id)
|
||||||
|
.expect("key of hashmap");
|
||||||
|
let result = request.continue_requests(self);
|
||||||
|
self.handle_custody_by_root_result(id, request, result)
|
||||||
|
.map(|result| (id, result))
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
// Request handlers
|
// Request handlers
|
||||||
|
|
||||||
pub fn on_single_block_response(
|
pub fn on_single_block_response(
|
||||||
@@ -1069,7 +1098,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
req_id: DataColumnsByRootRequestId,
|
req_id: DataColumnsByRootRequestId,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
resp: RpcResponseResult<Vec<Arc<DataColumnSidecar<T::EthSpec>>>>,
|
resp: RpcResponseResult<Vec<Arc<DataColumnSidecar<T::EthSpec>>>>,
|
||||||
) -> Option<Result<(DataColumnSidecarList<T::EthSpec>, PeerGroup), RpcResponseError>> {
|
) -> Option<CustodyByRootResult<T::EthSpec>> {
|
||||||
// Note: need to remove the request to borrow self again below. Otherwise we can't
|
// Note: need to remove the request to borrow self again below. Otherwise we can't
|
||||||
// do nested requests
|
// do nested requests
|
||||||
let Some(mut request) = self.custody_by_root_requests.remove(&id.requester) else {
|
let Some(mut request) = self.custody_by_root_requests.remove(&id.requester) else {
|
||||||
@@ -1078,28 +1107,35 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
return None;
|
return None;
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = request
|
let result = request.on_data_column_downloaded(peer_id, req_id, resp, self);
|
||||||
.on_data_column_downloaded(peer_id, req_id, resp, self)
|
|
||||||
|
self.handle_custody_by_root_result(id.requester, request, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_custody_by_root_result(
|
||||||
|
&mut self,
|
||||||
|
id: CustodyRequester,
|
||||||
|
request: ActiveCustodyRequest<T>,
|
||||||
|
result: CustodyRequestResult<T::EthSpec>,
|
||||||
|
) -> Option<CustodyByRootResult<T::EthSpec>> {
|
||||||
|
let result = result
|
||||||
.map_err(RpcResponseError::CustodyRequestError)
|
.map_err(RpcResponseError::CustodyRequestError)
|
||||||
.transpose();
|
.transpose();
|
||||||
|
|
||||||
// Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to
|
// Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to
|
||||||
// an Option first to use in an `if let Some() { act on result }` block.
|
// an Option first to use in an `if let Some() { act on result }` block.
|
||||||
if let Some(result) = result {
|
match result.as_ref() {
|
||||||
match result.as_ref() {
|
Some(Ok((columns, peer_group))) => {
|
||||||
Ok((columns, peer_group)) => {
|
debug!(self.log, "Custody request success, removing"; "id" => ?id, "count" => columns.len(), "peers" => ?peer_group)
|
||||||
debug!(self.log, "Custody request success, removing"; "id" => ?id, "count" => columns.len(), "peers" => ?peer_group)
|
}
|
||||||
}
|
Some(Err(e)) => {
|
||||||
Err(e) => {
|
debug!(self.log, "Custody request failure, removing"; "id" => ?id, "error" => ?e)
|
||||||
debug!(self.log, "Custody request failure, removing"; "id" => ?id, "error" => ?e)
|
}
|
||||||
}
|
None => {
|
||||||
|
self.custody_by_root_requests.insert(id, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
Some(result)
|
|
||||||
} else {
|
|
||||||
self.custody_by_root_requests.insert(id.requester, request);
|
|
||||||
None
|
|
||||||
}
|
}
|
||||||
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send_block_for_processing(
|
pub fn send_block_for_processing(
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use lighthouse_network::PeerId;
|
|||||||
use lru_cache::LRUTimeCache;
|
use lru_cache::LRUTimeCache;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use slog::{debug, warn};
|
use slog::{debug, warn};
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
|
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
|
||||||
use types::EthSpec;
|
use types::EthSpec;
|
||||||
use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Hash256};
|
use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Hash256};
|
||||||
@@ -17,6 +17,7 @@ use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Hash256};
|
|||||||
use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext};
|
use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext};
|
||||||
|
|
||||||
const FAILED_PEERS_CACHE_EXPIRY_SECONDS: u64 = 5;
|
const FAILED_PEERS_CACHE_EXPIRY_SECONDS: u64 = 5;
|
||||||
|
const MAX_STALE_NO_PEERS_DURATION: Duration = Duration::from_secs(30);
|
||||||
|
|
||||||
type DataColumnSidecarList<E> = Vec<Arc<DataColumnSidecar<E>>>;
|
type DataColumnSidecarList<E> = Vec<Arc<DataColumnSidecar<E>>>;
|
||||||
|
|
||||||
@@ -56,7 +57,7 @@ struct ActiveBatchColumnsRequest {
|
|||||||
indices: Vec<ColumnIndex>,
|
indices: Vec<ColumnIndex>,
|
||||||
}
|
}
|
||||||
|
|
||||||
type CustodyRequestResult<E> = Result<Option<(DataColumnSidecarList<E>, PeerGroup)>, Error>;
|
pub type CustodyRequestResult<E> = Result<Option<(DataColumnSidecarList<E>, PeerGroup)>, Error>;
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
|
impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
@@ -221,13 +222,13 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
|
|||||||
// - which peer returned what to have PeerGroup attributability
|
// - which peer returned what to have PeerGroup attributability
|
||||||
|
|
||||||
for (column_index, request) in self.column_requests.iter_mut() {
|
for (column_index, request) in self.column_requests.iter_mut() {
|
||||||
if request.is_awaiting_download() {
|
if let Some(wait_duration) = request.is_awaiting_download() {
|
||||||
if request.download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS {
|
if request.download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS {
|
||||||
return Err(Error::TooManyFailures);
|
return Err(Error::TooManyFailures);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: When is a fork and only a subset of your peers know about a block, we should only
|
// TODO(das): When is a fork and only a subset of your peers know about a block, we should
|
||||||
// query the peers on that fork. Should this case be handled? How to handle it?
|
// only query the peers on that fork. Should this case be handled? How to handle it?
|
||||||
let custodial_peers = cx.get_custodial_peers(*column_index);
|
let custodial_peers = cx.get_custodial_peers(*column_index);
|
||||||
|
|
||||||
// TODO(das): cache this computation in a OneCell or similar to prevent having to
|
// TODO(das): cache this computation in a OneCell or similar to prevent having to
|
||||||
@@ -256,17 +257,20 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
|
|||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
priorized_peers.sort_unstable();
|
priorized_peers.sort_unstable();
|
||||||
|
|
||||||
let Some((_, _, _, peer_id)) = priorized_peers.first() else {
|
if let Some((_, _, _, peer_id)) = priorized_peers.first() {
|
||||||
// Do not tolerate not having custody peers, hard error.
|
columns_to_request_by_peer
|
||||||
// TODO(das): we might implement some grace period. The request will pause for X
|
.entry(*peer_id)
|
||||||
// seconds expecting the peer manager to find peers before failing the request.
|
.or_default()
|
||||||
|
.push(*column_index);
|
||||||
|
} else if wait_duration > MAX_STALE_NO_PEERS_DURATION {
|
||||||
|
// Allow to request to sit stale in `NotStarted` state for at most
|
||||||
|
// `MAX_STALE_NO_PEERS_DURATION`, else error and drop the request. Note that
|
||||||
|
// lookup will naturally retry when other peers send us attestations for
|
||||||
|
// descendants of this un-available lookup.
|
||||||
return Err(Error::NoPeers(*column_index));
|
return Err(Error::NoPeers(*column_index));
|
||||||
};
|
} else {
|
||||||
|
// Do not issue requests if there is no custody peer on this column
|
||||||
columns_to_request_by_peer
|
}
|
||||||
.entry(*peer_id)
|
|
||||||
.or_default()
|
|
||||||
.push(*column_index);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -315,7 +319,7 @@ struct ColumnRequest<E: EthSpec> {
|
|||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
enum Status<E: EthSpec> {
|
enum Status<E: EthSpec> {
|
||||||
NotStarted,
|
NotStarted(Instant),
|
||||||
Downloading(DataColumnsByRootRequestId),
|
Downloading(DataColumnsByRootRequestId),
|
||||||
Downloaded(PeerId, Arc<DataColumnSidecar<E>>),
|
Downloaded(PeerId, Arc<DataColumnSidecar<E>>),
|
||||||
}
|
}
|
||||||
@@ -323,28 +327,28 @@ enum Status<E: EthSpec> {
|
|||||||
impl<E: EthSpec> ColumnRequest<E> {
|
impl<E: EthSpec> ColumnRequest<E> {
|
||||||
fn new() -> Self {
|
fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
status: Status::NotStarted,
|
status: Status::NotStarted(Instant::now()),
|
||||||
download_failures: 0,
|
download_failures: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_awaiting_download(&self) -> bool {
|
fn is_awaiting_download(&self) -> Option<Duration> {
|
||||||
match self.status {
|
match self.status {
|
||||||
Status::NotStarted => true,
|
Status::NotStarted(start_time) => Some(start_time.elapsed()),
|
||||||
Status::Downloading { .. } | Status::Downloaded { .. } => false,
|
Status::Downloading { .. } | Status::Downloaded { .. } => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_downloaded(&self) -> bool {
|
fn is_downloaded(&self) -> bool {
|
||||||
match self.status {
|
match self.status {
|
||||||
Status::NotStarted | Status::Downloading { .. } => false,
|
Status::NotStarted { .. } | Status::Downloading { .. } => false,
|
||||||
Status::Downloaded { .. } => true,
|
Status::Downloaded { .. } => true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_download_start(&mut self, req_id: DataColumnsByRootRequestId) -> Result<(), Error> {
|
fn on_download_start(&mut self, req_id: DataColumnsByRootRequestId) -> Result<(), Error> {
|
||||||
match &self.status {
|
match &self.status {
|
||||||
Status::NotStarted => {
|
Status::NotStarted { .. } => {
|
||||||
self.status = Status::Downloading(req_id);
|
self.status = Status::Downloading(req_id);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -363,7 +367,7 @@ impl<E: EthSpec> ColumnRequest<E> {
|
|||||||
req_id,
|
req_id,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
self.status = Status::NotStarted;
|
self.status = Status::NotStarted(Instant::now());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
other => Err(Error::BadState(format!(
|
other => Err(Error::BadState(format!(
|
||||||
|
|||||||
Reference in New Issue
Block a user