Implement PeerDAS RPC handlers (#6237)

* Implement PeerDAS RPC handlers

* use terminate_response_stream

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into peerdas-network-rpc-handler

* cargo fmt
This commit is contained in:
Lion - dapplion
2024-08-15 23:15:09 +02:00
committed by GitHub
parent 9fc0a662c3
commit 6566705505
6 changed files with 313 additions and 11 deletions

View File

@@ -320,16 +320,66 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn handle_data_columns_by_root_request(
self: Arc<Self>,
peer_id: PeerId,
_request_id: PeerRequestId,
request_id: PeerRequestId,
request: DataColumnsByRootRequest,
) {
// TODO(das): implement handler
debug!(self.log, "Received DataColumnsByRoot Request";
"peer_id" => %peer_id,
"count" => request.data_column_ids.len()
self.terminate_response_stream(
peer_id,
request_id,
self.handle_data_columns_by_root_request_inner(peer_id, request_id, request),
Response::DataColumnsByRoot,
);
}
/// Handle a `DataColumnsByRoot` request from the peer.
pub fn handle_data_columns_by_root_request_inner(
&self,
peer_id: PeerId,
request_id: PeerRequestId,
request: DataColumnsByRootRequest,
) -> Result<(), (RPCResponseErrorCode, &'static str)> {
let mut send_data_column_count = 0;
for data_column_id in request.data_column_ids.as_slice() {
match self.chain.get_data_column_checking_all_caches(
data_column_id.block_root,
data_column_id.index,
) {
Ok(Some(data_column)) => {
send_data_column_count += 1;
self.send_response(
peer_id,
Response::DataColumnsByRoot(Some(data_column)),
request_id,
);
}
Ok(None) => {} // no-op
Err(e) => {
// TODO(das): lower log level when feature is stabilized
error!(self.log, "Error getting data column";
"block_root" => ?data_column_id.block_root,
"peer" => %peer_id,
"error" => ?e
);
return Err((
RPCResponseErrorCode::ServerError,
"Error getting data column",
));
}
}
}
debug!(
self.log,
"Received DataColumnsByRoot Request";
"peer" => %peer_id,
"request" => ?request.group_by_ordered_block_root(),
"returned" => send_data_column_count
);
Ok(())
}
/// Handle a `LightClientBootstrap` request from the peer.
pub fn handle_light_client_bootstrap(
self: &Arc<Self>,
@@ -833,17 +883,196 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Handle a `DataColumnsByRange` request from the peer.
pub fn handle_data_columns_by_range_request(
self: Arc<Self>,
&self,
peer_id: PeerId,
_request_id: PeerRequestId,
request_id: PeerRequestId,
req: DataColumnsByRangeRequest,
) {
// TODO(das): implement handler
self.terminate_response_stream(
peer_id,
request_id,
self.handle_data_columns_by_range_request_inner(peer_id, request_id, req),
Response::DataColumnsByRange,
);
}
/// Handle a `DataColumnsByRange` request from the peer.
pub fn handle_data_columns_by_range_request_inner(
&self,
peer_id: PeerId,
request_id: PeerRequestId,
req: DataColumnsByRangeRequest,
) -> Result<(), (RPCResponseErrorCode, &'static str)> {
debug!(self.log, "Received DataColumnsByRange Request";
"peer_id" => %peer_id,
"count" => req.count,
"start_slot" => req.start_slot,
);
// Should not send more than max request data columns
if req.max_requested::<T::EthSpec>() > self.chain.spec.max_request_data_column_sidecars {
return Err((
RPCResponseErrorCode::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`",
));
}
let request_start_slot = Slot::from(req.start_slot);
let data_availability_boundary_slot = match self.chain.data_availability_boundary() {
Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()),
None => {
debug!(self.log, "Deneb fork is disabled");
return Err((
RPCResponseErrorCode::InvalidRequest,
"Deneb fork is disabled",
));
}
};
let oldest_data_column_slot = self
.chain
.store
.get_data_column_info()
.oldest_data_column_slot
.unwrap_or(data_availability_boundary_slot);
if request_start_slot < oldest_data_column_slot {
debug!(
self.log,
"Range request start slot is older than data availability boundary.";
"requested_slot" => request_start_slot,
"oldest_data_column_slot" => oldest_data_column_slot,
"data_availability_boundary" => data_availability_boundary_slot
);
return if data_availability_boundary_slot < oldest_data_column_slot {
Err((
RPCResponseErrorCode::ResourceUnavailable,
"blobs pruned within boundary",
))
} else {
Err((
RPCResponseErrorCode::InvalidRequest,
"Req outside availability period",
))
};
}
let forwards_block_root_iter =
match self.chain.forwards_iter_block_roots(request_start_slot) {
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockError(
HistoricalBlockError::BlockOutOfRange {
slot,
oldest_block_slot,
},
)) => {
debug!(self.log, "Range request failed during backfill";
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return Err((RPCResponseErrorCode::ResourceUnavailable, "Backfilling"));
}
Err(e) => {
error!(self.log, "Unable to obtain root iter";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RPCResponseErrorCode::ServerError, "Database error"));
}
};
// Use `WhenSlotSkipped::Prev` to get the most recent block root prior to
// `request_start_slot` in order to check whether the `request_start_slot` is a skip.
let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| {
self.chain
.block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev)
.ok()
.flatten()
});
// Pick out the required blocks, ignoring skip-slots.
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
});
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
error!(self.log, "Error during iteration over blocks";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RPCResponseErrorCode::ServerError, "Database error"));
}
};
// remove all skip slots
let block_roots = block_roots.into_iter().flatten();
let mut data_columns_sent = 0;
for root in block_roots {
for index in &req.columns {
match self.chain.get_data_column(&root, index) {
Ok(Some(data_column_sidecar)) => {
data_columns_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::DataColumnsByRange(Some(
data_column_sidecar.clone(),
)),
id: request_id,
});
}
Ok(None) => {} // no-op
Err(e) => {
error!(
self.log,
"Error fetching data columns block root";
"request" => ?req,
"peer" => %peer_id,
"block_root" => ?root,
"error" => ?e
);
return Err((
RPCResponseErrorCode::ServerError,
"No data columns and failed fetching corresponding block",
));
}
}
}
}
let current_slot = self
.chain
.slot()
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
debug!(
self.log,
"DataColumnsByRange Response processed";
"peer" => %peer_id,
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => data_columns_sent
);
Ok(())
}
/// Helper function to ensure single item protocol always end with either a single chunk or an