Implement reliable range sync for PeerDAS

This commit is contained in:
dapplion
2025-05-21 23:34:28 -05:00
parent b014675b7a
commit 4fb2ae658a
23 changed files with 2580 additions and 701 deletions

View File

@@ -1,11 +1,11 @@
//! Provides network functionality for the Syncing thread. This fundamentally wraps a network
//! channel and stores a global RPC ID to perform requests.
use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError};
use self::custody_by_range::{ActiveCustodyByRangeRequest, CustodyByRangeRequestResult};
use self::custody_by_root::{ActiveCustodyByRootRequest, CustodyByRootRequestResult};
pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest};
use super::block_sidecar_coupling::RangeBlockComponentsRequest;
use super::manager::BlockProcessType;
use super::range_sync::{BatchPeers, ByRangeRequestType};
use super::range_sync::BatchPeers;
use super::SyncMessage;
use crate::metrics;
use crate::network_beacon_processor::NetworkBeaconProcessor;
@@ -17,15 +17,17 @@ use crate::sync::block_lookups::SingleLookupId;
use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState};
use custody::CustodyRequestResult;
pub use block_components_by_range::BlockComponentsByRangeRequest;
#[cfg(test)]
pub use block_components_by_range::BlockComponentsByRangeRequestStep;
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest};
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType};
pub use lighthouse_network::service::api_types::RangeRequestId;
use lighthouse_network::service::api_types::{
AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRootRequestId,
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId,
CustodyByRangeRequestId, CustodyId, CustodyRequester, DataColumnsByRangeRequestId,
DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId,
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
use parking_lot::RwLock;
@@ -36,7 +38,6 @@ use requests::{
};
#[cfg(test)]
use slot_clock::SlotClock;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;
@@ -47,11 +48,13 @@ use tokio::sync::mpsc;
use tracing::{debug, error, span, warn, Level};
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext,
Hash256, SignedBeaconBlock, Slot,
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec,
ForkContext, Hash256, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};
pub mod custody;
pub mod block_components_by_range;
pub mod custody_by_range;
pub mod custody_by_root;
mod requests;
#[derive(Debug)]
@@ -72,32 +75,29 @@ impl<T> RpcEvent<T> {
pub type RpcResponseResult<T> = Result<(T, Duration), RpcResponseError>;
/// Duration = latest seen timestamp of all received data columns
pub type CustodyByRootResult<T> =
Result<(DataColumnSidecarList<T>, PeerGroup, Duration), RpcResponseError>;
pub type RpcResponseBatchResult<T> = Result<(T, PeerGroup, Duration), RpcResponseError>;
#[derive(Debug)]
/// Duration = latest seen timestamp of all received data columns
pub type CustodyByRootResult<T> = RpcResponseBatchResult<DataColumnSidecarList<T>>;
pub type CustodyByRangeResult<T> = RpcResponseBatchResult<DataColumnSidecarList<T>>;
#[derive(Debug, Clone)]
pub enum RpcResponseError {
RpcError(#[allow(dead_code)] RPCError),
VerifyError(LookupVerifyError),
CustodyRequestError(#[allow(dead_code)] CustodyRequestError),
BlockComponentCouplingError(#[allow(dead_code)] String),
RequestExpired(String),
InternalError(#[allow(dead_code)] String),
}
#[derive(Debug, PartialEq, Eq)]
pub enum RpcRequestSendError {
/// No peer available matching the required criteria
NoPeer(NoPeerError),
/// These errors should never happen, including unreachable custody errors or network send
/// errors.
InternalError(String),
}
/// Type of peer missing that caused a `RpcRequestSendError::NoPeers`
#[derive(Debug, PartialEq, Eq)]
pub enum NoPeerError {
BlockPeer,
CustodyPeer(ColumnIndex),
// If RpcRequestSendError has a single variant `InternalError` it's to signal to downstream
// consumers that sends are expected to be infallible. If this assumption changes in the future,
// add a new variant.
}
#[derive(Debug, PartialEq, Eq)]
@@ -150,6 +150,17 @@ impl PeerGroup {
}
})
}
pub fn as_reversed_map(&self) -> HashMap<u64, PeerId> {
// TODO(das): should we change PeerGroup to hold this map?
let mut index_to_peer = HashMap::<u64, PeerId>::new();
for (peer, indices) in self.peers.iter() {
for &index in indices {
index_to_peer.insert(index as u64, *peer);
}
}
index_to_peer
}
}
/// Sequential ID that uniquely identifies ReqResp outgoing requests
@@ -195,12 +206,15 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
data_columns_by_range_requests:
ActiveRequests<DataColumnsByRangeRequestId, DataColumnsByRangeRequestItems<T::EthSpec>>,
/// Mapping of active custody column requests for a block root
custody_by_root_requests: FnvHashMap<CustodyRequester, ActiveCustodyRequest<T>>,
/// Mapping of active custody column by root requests for a block root
custody_by_root_requests: FnvHashMap<CustodyRequester, ActiveCustodyByRootRequest<T>>,
/// Mapping of active custody column by range requests
custody_by_range_requests: FnvHashMap<CustodyByRangeRequestId, ActiveCustodyByRangeRequest<T>>,
/// BlocksByRange requests paired with other ByRange requests for data components
components_by_range_requests:
FnvHashMap<ComponentsByRangeRequestId, RangeBlockComponentsRequest<T::EthSpec>>,
block_components_by_range_requests:
FnvHashMap<ComponentsByRangeRequestId, BlockComponentsByRangeRequest<T>>,
/// Whether the ee is online. If it's not, we don't allow access to the
/// `beacon_processor_send`.
@@ -219,14 +233,17 @@ pub enum RangeBlockComponent<E: EthSpec> {
Block(
BlocksByRangeRequestId,
RpcResponseResult<Vec<Arc<SignedBeaconBlock<E>>>>,
PeerId,
),
Blob(
BlobsByRangeRequestId,
RpcResponseResult<Vec<Arc<BlobSidecar<E>>>>,
PeerId,
),
CustodyColumns(
DataColumnsByRangeRequestId,
CustodyByRangeRequestId,
RpcResponseResult<Vec<Arc<DataColumnSidecar<E>>>>,
PeerGroup,
),
}
@@ -283,7 +300,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blobs_by_range_requests: ActiveRequests::new("blobs_by_range"),
data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"),
custody_by_root_requests: <_>::default(),
components_by_range_requests: FnvHashMap::default(),
custody_by_range_requests: <_>::default(),
block_components_by_range_requests: <_>::default(),
network_beacon_processor,
chain,
fork_context,
@@ -297,6 +315,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
/// Returns the ids of all the requests made to the given peer_id.
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Vec<SyncRequestId> {
self.active_requests()
.filter(|(_, request_peer)| *request_peer == peer_id)
.map(|(id, _)| id)
.collect()
}
/// Returns the ids of all active requests
pub fn active_requests(&mut self) -> impl Iterator<Item = (SyncRequestId, &PeerId)> {
// Note: using destructuring pattern without a default case to make sure we don't forget to
// add new request types to this function. Otherwise, lookup sync can break and lookups
// will get stuck if a peer disconnects during an active requests.
@@ -311,8 +337,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
data_columns_by_range_requests,
// custody_by_root_requests is a meta request of data_columns_by_root_requests
custody_by_root_requests: _,
custody_by_range_requests: _,
// components_by_range_requests is a meta request of various _by_range requests
components_by_range_requests: _,
block_components_by_range_requests: _,
execution_engine_state: _,
network_beacon_processor: _,
chain: _,
@@ -320,29 +347,23 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
} = self;
let blocks_by_root_ids = blocks_by_root_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|id| SyncRequestId::SingleBlock { id: *id });
.active_requests()
.map(|(id, peer)| (SyncRequestId::SingleBlock { id: *id }, peer));
let blobs_by_root_ids = blobs_by_root_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|id| SyncRequestId::SingleBlob { id: *id });
.active_requests()
.map(|(id, peer)| (SyncRequestId::SingleBlob { id: *id }, peer));
let data_column_by_root_ids = data_columns_by_root_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|req_id| SyncRequestId::DataColumnsByRoot(*req_id));
.active_requests()
.map(|(id, peer)| (SyncRequestId::DataColumnsByRoot(*id), peer));
let blocks_by_range_ids = blocks_by_range_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|req_id| SyncRequestId::BlocksByRange(*req_id));
.active_requests()
.map(|(id, peer)| (SyncRequestId::BlocksByRange(*id), peer));
let blobs_by_range_ids = blobs_by_range_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|req_id| SyncRequestId::BlobsByRange(*req_id));
.active_requests()
.map(|(id, peer)| (SyncRequestId::BlobsByRange(*id), peer));
let data_column_by_range_ids = data_columns_by_range_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|req_id| SyncRequestId::DataColumnsByRange(*req_id));
.active_requests()
.map(|(id, peer)| (SyncRequestId::DataColumnsByRange(*id), peer));
blocks_by_root_ids
.chain(blobs_by_root_ids)
@@ -350,6 +371,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.chain(blocks_by_range_ids)
.chain(blobs_by_range_ids)
.chain(data_column_by_range_ids)
}
#[cfg(test)]
pub fn active_block_components_by_range_requests(
&self,
) -> Vec<(
ComponentsByRangeRequestId,
BlockComponentsByRangeRequestStep,
)> {
self.block_components_by_range_requests
.iter()
.map(|(id, req)| (*id, req.state_step()))
.collect()
}
@@ -362,6 +395,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&self.network_beacon_processor.network_globals
}
pub fn spec(&self) -> &ChainSpec {
&self.chain.spec
}
/// Returns the Client type of the peer if known
pub fn client_type(&self, peer_id: &PeerId) -> Client {
self.network_globals()
@@ -414,8 +451,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
data_columns_by_range_requests,
// custody_by_root_requests is a meta request of data_columns_by_root_requests
custody_by_root_requests: _,
custody_by_range_requests: _,
// components_by_range_requests is a meta request of various _by_range requests
components_by_range_requests: _,
block_components_by_range_requests: _,
execution_engine_state: _,
network_beacon_processor: _,
chain: _,
@@ -447,205 +485,95 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
requester: RangeRequestId,
peers: &HashSet<PeerId>,
peers_to_deprioritize: &HashSet<PeerId>,
total_requests_per_peer: &HashMap<PeerId, usize>,
) -> Result<Id, RpcRequestSendError> {
let batch_epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
let batch_type = self.batch_type(batch_epoch);
let active_request_count_by_peer = self.active_request_count_by_peer();
let Some(block_peer) = peers
.iter()
.map(|peer| {
(
// If contains -> 1 (order after), not contains -> 0 (order first)
peers_to_deprioritize.contains(peer),
// Prefer peers with less overall requests
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
// Random factor to break ties, otherwise the PeerID breaks ties
rand::random::<u32>(),
peer,
)
})
.min()
.map(|(_, _, _, peer)| *peer)
else {
// Backfill and forward sync handle this condition gracefully.
// - Backfill sync: will pause waiting for more peers to join
// - Forward sync: can never happen as the chain is dropped when removing the last peer.
return Err(RpcRequestSendError::NoPeer(NoPeerError::BlockPeer));
};
// Attempt to find all required custody peers before sending any request or creating an ID
let columns_by_range_peers_to_request =
if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) {
let column_indexes = self.network_globals().sampling_columns.clone();
Some(self.select_columns_by_range_peers_to_request(
&column_indexes,
peers,
active_request_count_by_peer,
peers_to_deprioritize,
)?)
} else {
None
};
// Create the overall components_by_range request ID before its individual components
let id = ComponentsByRangeRequestId {
id: self.next_id(),
requester,
};
let blocks_req_id = self.send_blocks_by_range_request(block_peer, request.clone(), id)?;
let req = BlockComponentsByRangeRequest::new(
id,
request,
peers,
peers_to_deprioritize,
total_requests_per_peer,
self,
)?;
let blobs_req_id = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) {
Some(self.send_blobs_by_range_request(
block_peer,
BlobsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
},
id,
)?)
} else {
None
};
let data_column_requests = columns_by_range_peers_to_request
.map(|columns_by_range_peers_to_request| {
let column_to_peer_map = columns_by_range_peers_to_request
.iter()
.flat_map(|(peer_id, columns)| columns.iter().map(|column| (*column, *peer_id)))
.collect::<HashMap<ColumnIndex, PeerId>>();
let requests = columns_by_range_peers_to_request
.into_iter()
.map(|(peer_id, columns)| {
self.send_data_columns_by_range_request(
peer_id,
DataColumnsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
columns,
},
id,
)
})
.collect::<Result<Vec<_>, _>>()?;
Ok((requests, column_to_peer_map))
})
.transpose()?;
let info =
RangeBlockComponentsRequest::new(blocks_req_id, blobs_req_id, data_column_requests);
self.components_by_range_requests.insert(id, info);
self.block_components_by_range_requests.insert(id, req);
// TODO: use ID
Ok(id.id)
}
fn select_columns_by_range_peers_to_request(
&self,
custody_indexes: &HashSet<ColumnIndex>,
peers: &HashSet<PeerId>,
active_request_count_by_peer: HashMap<PeerId, usize>,
peers_to_deprioritize: &HashSet<PeerId>,
) -> Result<HashMap<PeerId, Vec<ColumnIndex>>, RpcRequestSendError> {
let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
for column_index in custody_indexes {
// Strictly consider peers that are custodials of this column AND are part of this
// syncing chain. If the forward range sync chain has few peers, it's likely that this
// function will not be able to find peers on our custody columns.
let Some(custody_peer) = peers
.iter()
.filter(|peer| {
self.network_globals()
.is_custody_peer_of(*column_index, peer)
})
.map(|peer| {
(
// If contains -> 1 (order after), not contains -> 0 (order first)
peers_to_deprioritize.contains(peer),
// Prefer peers with less overall requests
// Also account for requests that are not yet issued tracked in peer_id_to_request_map
// We batch requests to the same peer, so count existance in the
// `columns_to_request_by_peer` as a single 1 request.
active_request_count_by_peer.get(peer).copied().unwrap_or(0)
+ columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0),
// Random factor to break ties, otherwise the PeerID breaks ties
rand::random::<u32>(),
peer,
)
})
.min()
.map(|(_, _, _, peer)| *peer)
else {
// TODO(das): this will be pretty bad UX. To improve we should:
// - Handle the no peers case gracefully, maybe add some timeout and give a few
// minutes / seconds to the peer manager to locate peers on this subnet before
// abandoing progress on the chain completely.
return Err(RpcRequestSendError::NoPeer(NoPeerError::CustodyPeer(
*column_index,
)));
};
columns_to_request_by_peer
.entry(custody_peer)
.or_default()
.push(*column_index);
}
Ok(columns_to_request_by_peer)
}
/// Received a _by_range response for a request that couples blocks and its data
///
/// `peer_id` is the peer that served this individual RPC _by_range response.
/// Received a blocks by range or blobs by range response for a request that couples blocks '
/// and blobs.
#[allow(clippy::type_complexity)]
pub fn range_block_component_response(
pub fn on_block_components_by_range_response(
&mut self,
id: ComponentsByRangeRequestId,
peer_id: PeerId,
range_block_component: RangeBlockComponent<T::EthSpec>,
) -> Option<Result<(Vec<RpcBlock<T::EthSpec>>, BatchPeers), RpcResponseError>> {
let Entry::Occupied(mut entry) = self.components_by_range_requests.entry(id) else {
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["range_blocks"]);
// Note: need to remove the request to borrow self again below. Otherwise we can't
// do nested requests
let Some(mut request) = self.block_components_by_range_requests.remove(&id) else {
metrics::inc_counter_vec(
&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS,
&["block_components_by_range"],
);
return None;
};
if let Err(e) = {
let request = entry.get_mut();
match range_block_component {
RangeBlockComponent::Block(req_id, resp) => resp.and_then(|(blocks, _)| {
let result = match range_block_component {
RangeBlockComponent::Block(req_id, resp, peer_id) => resp.and_then(|(blocks, _)| {
request
.on_blocks_by_range_result(req_id, blocks, peer_id, self)
.map_err(Into::<RpcResponseError>::into)
}),
RangeBlockComponent::Blob(req_id, resp, peer_id) => resp.and_then(|(blobs, _)| {
request
.on_blobs_by_range_result(req_id, blobs, peer_id, self)
.map_err(Into::<RpcResponseError>::into)
}),
RangeBlockComponent::CustodyColumns(req_id, resp, peers) => {
resp.and_then(|(custody_columns, _)| {
request
.add_blocks(req_id, blocks, peer_id)
.map_err(RpcResponseError::BlockComponentCouplingError)
}),
RangeBlockComponent::Blob(req_id, resp) => resp.and_then(|(blobs, _)| {
request
.add_blobs(req_id, blobs, peer_id)
.map_err(RpcResponseError::BlockComponentCouplingError)
}),
RangeBlockComponent::CustodyColumns(req_id, resp) => {
resp.and_then(|(custody_columns, _)| {
request
.add_custody_columns(req_id, custody_columns, peer_id)
.map_err(RpcResponseError::BlockComponentCouplingError)
})
}
.on_custody_by_range_result(req_id, custody_columns, peers, self)
.map_err(Into::<RpcResponseError>::into)
})
}
} {
entry.remove();
return Some(Err(e));
}
};
if let Some(blocks_result) = entry.get().responses(&self.chain.spec) {
entry.remove();
// If the request is finished, dequeue everything
Some(blocks_result.map_err(RpcResponseError::BlockComponentCouplingError))
} else {
None
let result = result.transpose();
// Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to
// an Option first to use in an `if let Some() { act on result }` block.
match result.as_ref() {
Some(Ok((blocks, peer_group))) => {
let blocks_with_data = blocks
.iter()
.filter(|block| block.as_block().has_data())
.count();
// Don't log the peer_group here, it's very long (could be up to 128 peers). If you
// want to trace which peer sent the column at index X, search for the log:
// `Sync RPC request sent method="DataColumnsByRange" ...`
debug!(
%id,
blocks = blocks.len(),
blocks_with_data,
block_peer = ?peer_group.block(),
"Block components by range request success, removing"
)
}
Some(Err(e)) => {
debug!(%id, error = ?e, "Block components by range request failure, removing" )
}
None => {
self.block_components_by_range_requests.insert(id, request);
}
}
result
}
/// Request block of `block_root` if necessary by checking:
@@ -853,7 +781,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
/// Request to send a single `data_columns_by_root` request to the network.
pub fn data_column_lookup_request(
pub fn data_columns_by_root_request(
&mut self,
requester: DataColumnsByRootRequester,
peer_id: PeerId,
@@ -951,7 +879,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
);
let requester = CustodyRequester(id);
let mut request = ActiveCustodyRequest::new(
let mut request = ActiveCustodyByRootRequest::new(
block_root,
CustodyId { requester },
&custody_indexes_to_fetch,
@@ -967,25 +895,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.custody_by_root_requests.insert(requester, request);
Ok(LookupRequestResult::RequestSent(id.req_id))
}
Err(e) => Err(match e {
CustodyRequestError::NoPeer(column_index) => {
RpcRequestSendError::NoPeer(NoPeerError::CustodyPeer(column_index))
}
// - TooManyFailures: Should never happen, `request` has just been created, it's
// count of download_failures is 0 here
// - BadState: Should never happen, a bad state can only happen when handling a
// network response
// - UnexpectedRequestId: Never happens: this Err is only constructed handling a
// download or processing response
// - SendFailed: Should never happen unless in a bad drop sequence when shutting
// down the node
e @ (CustodyRequestError::TooManyFailures
| CustodyRequestError::BadState { .. }
| CustodyRequestError::UnexpectedRequestId { .. }
| CustodyRequestError::SendFailed { .. }) => {
RpcRequestSendError::InternalError(format!("{e:?}"))
}
}),
Err(e) => Err(e.into()),
}
}
@@ -1073,8 +983,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
peer_id: PeerId,
request: DataColumnsByRangeRequest,
parent_request_id: ComponentsByRangeRequestId,
) -> Result<DataColumnsByRangeRequestId, RpcRequestSendError> {
parent_request_id: CustodyByRangeRequestId,
) -> Result<DataColumnsByRangeRequestId, &'static str> {
let id = DataColumnsByRangeRequestId {
id: self.next_id(),
parent_request_id,
@@ -1085,7 +995,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: RequestType::DataColumnsByRange(request.clone()),
app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)),
})
.map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
.map_err(|_| "network send error")?;
debug!(
method = "DataColumnsByRange",
@@ -1108,6 +1018,50 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(id)
}
/// Request to fetch all needed custody columns of a range of slot. This function may not send
/// any request to the network if no columns have to be fetched based on the import state of the
/// node. A custody request is a "super request" that may trigger 0 or more `data_columns_by_range`
/// requests.
pub fn send_custody_by_range_request(
&mut self,
parent_id: ComponentsByRangeRequestId,
blocks_with_data: Vec<SignedBeaconBlockHeader>,
epoch: Epoch,
column_indices: Vec<ColumnIndex>,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
) -> Result<CustodyByRangeRequestId, RpcRequestSendError> {
let id = CustodyByRangeRequestId {
id: self.next_id(),
parent_request_id: parent_id,
};
debug!(
indices = ?column_indices,
%id,
"Starting custody columns by range request"
);
let mut request = ActiveCustodyByRangeRequest::new(
id,
epoch,
blocks_with_data,
&column_indices,
lookup_peers,
);
// Note that you can only send, but not handle a response here
match request.continue_requests(self) {
Ok(_) => {
// Ignoring the result of `continue_requests` is okay. A request that has just been
// created cannot return data immediately, it must send some request to the network
// first. And there must exist some request, `custody_indexes_to_fetch` is not empty.
self.custody_by_range_requests.insert(id, request);
Ok(id)
}
Err(e) => Err(e.into()),
}
}
pub fn is_execution_engine_online(&self) -> bool {
self.execution_engine_state == EngineState::Online
}
@@ -1212,34 +1166,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
id
}
/// Check whether a batch for this epoch (and only this epoch) should request just blocks or
/// blocks and blobs.
fn batch_type(&self, epoch: types::Epoch) -> ByRangeRequestType {
// Induces a compile time panic if this doesn't hold true.
#[allow(clippy::assertions_on_constants)]
const _: () = assert!(
super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1
&& super::range_sync::EPOCHS_PER_BATCH == 1,
"To deal with alignment with deneb boundaries, batches need to be of just one epoch"
);
if self
.chain
.data_availability_checker
.data_columns_required_for_epoch(epoch)
{
ByRangeRequestType::BlocksAndColumns
} else if self
.chain
.data_availability_checker
.blobs_required_for_epoch(epoch)
{
ByRangeRequestType::BlocksAndBlobs
} else {
ByRangeRequestType::Blocks
}
}
/// Attempt to make progress on all custody_by_root requests. Some request may be stale waiting
/// for custody peers. Returns a Vec of results as zero or more requests may fail in this
/// attempt.
@@ -1266,6 +1192,32 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.collect()
}
/// Attempt to make progress on all custody_by_range requests. Some request may be stale waiting
/// for custody peers. Returns a Vec of results as zero or more requests may fail in this
/// attempt.
pub fn continue_custody_by_range_requests(
&mut self,
) -> Vec<(CustodyByRangeRequestId, CustodyByRangeResult<T::EthSpec>)> {
let ids = self
.custody_by_range_requests
.keys()
.copied()
.collect::<Vec<_>>();
// Need to collect ids and results in separate steps to re-borrow self.
ids.into_iter()
.filter_map(|id| {
let mut request = self
.custody_by_range_requests
.remove(&id)
.expect("key of hashmap");
let result = request.continue_requests(self);
self.handle_custody_by_range_result(id, request, result)
.map(|result| (id, result))
})
.collect()
}
// Request handlers
pub(crate) fn on_single_block_response(
@@ -1425,8 +1377,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// Note: need to remove the request to borrow self again below. Otherwise we can't
// do nested requests
let Some(mut request) = self.custody_by_root_requests.remove(&id.requester) else {
// TOOD(das): This log can happen if the request is error'ed early and dropped
debug!(?id, "Custody column downloaded event for unknown request");
metrics::inc_counter_vec(
&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS,
&["custody_by_root"],
);
return None;
};
@@ -1438,8 +1392,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
fn handle_custody_by_root_result(
&mut self,
id: CustodyRequester,
request: ActiveCustodyRequest<T>,
result: CustodyRequestResult<T::EthSpec>,
request: ActiveCustodyByRootRequest<T>,
result: CustodyByRootRequestResult<T::EthSpec>,
) -> Option<CustodyByRootResult<T::EthSpec>> {
let span = span!(
Level::INFO,
@@ -1448,18 +1402,16 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
);
let _enter = span.enter();
let result = result
.map_err(RpcResponseError::CustodyRequestError)
.transpose();
let result = result.map_err(Into::<RpcResponseError>::into).transpose();
// Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to
// an Option first to use in an `if let Some() { act on result }` block.
match result.as_ref() {
Some(Ok((columns, peer_group, _))) => {
debug!(?id, count = columns.len(), peers = ?peer_group, "Custody request success, removing")
debug!(%id, count = columns.len(), peers = ?peer_group, "Custody by root request success, removing")
}
Some(Err(e)) => {
debug!(?id, error = ?e, "Custody request failure, removing" )
debug!(%id, error = ?e, "Custody by root request failure, removing" )
}
None => {
self.custody_by_root_requests.insert(id, request);
@@ -1468,6 +1420,61 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
result
}
/// Insert a downloaded column into an active custody request. Then make progress on the
/// entire request.
///
/// ### Returns
///
/// - `Some`: Request completed, won't make more progress. Expect requester to act on the result.
/// - `None`: Request still active, requester should do no action
#[allow(clippy::type_complexity)]
pub fn on_custody_by_range_response(
&mut self,
id: CustodyByRangeRequestId,
req_id: DataColumnsByRangeRequestId,
peer_id: PeerId,
resp: RpcResponseResult<Vec<Arc<DataColumnSidecar<T::EthSpec>>>>,
) -> Option<CustodyByRootResult<T::EthSpec>> {
// Note: need to remove the request to borrow self again below. Otherwise we can't
// do nested requests
let Some(mut request) = self.custody_by_range_requests.remove(&id) else {
// TOOD(das): This log can happen if the request is error'ed early and dropped
debug!(%id, "Custody by range downloaded event for unknown request");
return None;
};
let result = request.on_data_column_downloaded(peer_id, req_id, resp, self);
self.handle_custody_by_range_result(id, request, result)
}
fn handle_custody_by_range_result(
&mut self,
id: CustodyByRangeRequestId,
request: ActiveCustodyByRangeRequest<T>,
result: CustodyByRangeRequestResult<T::EthSpec>,
) -> Option<CustodyByRangeResult<T::EthSpec>> {
let result = result.map_err(Into::<RpcResponseError>::into).transpose();
// Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to
// an Option first to use in an `if let Some() { act on result }` block.
match result.as_ref() {
Some(Ok((columns, _peer_group, _))) => {
// Don't log the peer_group here, it's very long (could be up to 128 peers). If you
// want to trace which peer sent the column at index X, search for the log:
// `Sync RPC request sent method="DataColumnsByRange" ...`
debug!(%id, count = columns.len(), "Custody by range request success, removing")
}
Some(Err(e)) => {
debug!(%id, error = ?e, "Custody by range request failure, removing" )
}
None => {
self.custody_by_range_requests.insert(id, request);
}
}
result
}
pub fn send_block_for_processing(
&self,
id: Id,
@@ -1529,7 +1536,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.beacon_processor_if_enabled()
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
debug!(?block_root, ?id, "Sending blobs for processing");
debug!(?block_root, %id, "Sending blobs for processing");
// Lookup sync event safety: If `beacon_processor.send_rpc_blobs` returns Ok() sync
// must receive a single `SyncMessage::BlockComponentProcessed` event with this process type
beacon_processor
@@ -1600,8 +1607,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
),
("custody_by_root", self.custody_by_root_requests.len()),
(
"components_by_range",
self.components_by_range_requests.len(),
"block_components_by_range",
self.block_components_by_range_requests.len(),
),
] {
metrics::set_gauge_vec(&metrics::SYNC_ACTIVE_NETWORK_REQUESTS, &[id], count as i64);