mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
Fix misc PeerDAS todos (#6862)
Address misc PeerDAS TODOs that are not too big for a dedicated PR I'll justify each TODO on an inlined comment
This commit is contained in:
@@ -2974,10 +2974,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// Only completed sampling results are received. Blocks are unavailable by default and should
|
||||
/// be pruned on finalization, on a timeout or by a max count.
|
||||
pub async fn process_sampling_completed(self: &Arc<Self>, block_root: Hash256) {
|
||||
// TODO(das): update fork-choice
|
||||
// TODO(das): update fork-choice, act on sampling result, adjust log level
|
||||
// NOTE: It is possible that sampling complets before block is imported into fork choice,
|
||||
// in that case we may need to update availability cache.
|
||||
// TODO(das): These log levels are too high, reduce once DAS matures
|
||||
info!(self.log, "Sampling completed"; "block_root" => %block_root);
|
||||
}
|
||||
|
||||
|
||||
@@ -110,15 +110,6 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
self.get_cached_blobs().iter().flatten().count()
|
||||
}
|
||||
|
||||
/// Checks if a data column of a given index exists in the cache.
|
||||
///
|
||||
/// Returns:
|
||||
/// - `true` if a data column for the given index exists.
|
||||
/// - `false` otherwise.
|
||||
fn data_column_exists(&self, data_column_index: u64) -> bool {
|
||||
self.get_cached_data_column(data_column_index).is_some()
|
||||
}
|
||||
|
||||
/// Returns the number of data columns that have been received and are stored in the cache.
|
||||
pub fn num_received_data_columns(&self) -> usize {
|
||||
self.verified_data_columns.len()
|
||||
@@ -182,8 +173,7 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
kzg_verified_data_columns: I,
|
||||
) -> Result<(), AvailabilityCheckError> {
|
||||
for data_column in kzg_verified_data_columns {
|
||||
// TODO(das): Add equivalent checks for data columns if necessary
|
||||
if !self.data_column_exists(data_column.index()) {
|
||||
if self.get_cached_data_column(data_column.index()).is_none() {
|
||||
self.verified_data_columns.push(data_column);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1477,22 +1477,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
);
|
||||
return None;
|
||||
}
|
||||
Err(e @ BlockError::InternalError(_)) => {
|
||||
// BlobNotRequired is unreachable. Only constructed in `process_gossip_blob`
|
||||
Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => {
|
||||
error!(self.log, "Internal block gossip validation error";
|
||||
"error" => %e
|
||||
);
|
||||
return None;
|
||||
}
|
||||
Err(e @ BlockError::BlobNotRequired(_)) => {
|
||||
// TODO(das): penalty not implemented yet as other clients may still send us blobs
|
||||
// during early stage of implementation.
|
||||
debug!(self.log, "Received blobs for slot after PeerDAS epoch from peer";
|
||||
"error" => %e,
|
||||
"peer_id" => %peer_id,
|
||||
);
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);
|
||||
@@ -1603,9 +1594,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
let block = verified_block.block.block_cloned();
|
||||
let block_root = verified_block.block_root;
|
||||
|
||||
// TODO(das) Might be too early to issue a request here. We haven't checked that the block
|
||||
// actually includes blob transactions and thus has data. A peer could send a block is
|
||||
// garbage commitments, and make us trigger sampling for a block that does not have data.
|
||||
// Note: okay to issue sampling request before the block is execution verified. If the
|
||||
// proposer sends us a block with invalid blob transactions it can trigger us to issue
|
||||
// sampling queries that will never resolve. This attack is equivalent to withholding data.
|
||||
// Dismissed proposal to move this block to post-execution: https://github.com/sigp/lighthouse/pull/6492
|
||||
if block.num_expected_blobs() > 0 {
|
||||
// Trigger sampling for block not yet execution valid. At this point column custodials are
|
||||
// unlikely to have received their columns. Triggering sampling so early is only viable with
|
||||
|
||||
@@ -336,9 +336,31 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
self: Arc<NetworkBeaconProcessor<T>>,
|
||||
block_root: Hash256,
|
||||
custody_columns: DataColumnSidecarList<T::EthSpec>,
|
||||
_seen_timestamp: Duration,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) {
|
||||
// custody_columns must always have at least one element
|
||||
let Some(slot) = custody_columns.first().map(|d| d.slot()) else {
|
||||
return;
|
||||
};
|
||||
|
||||
if let Ok(current_slot) = self.chain.slot() {
|
||||
if current_slot == slot {
|
||||
let delay = get_slot_delay_ms(seen_timestamp, slot, &self.chain.slot_clock);
|
||||
metrics::observe_duration(&metrics::BEACON_BLOB_RPC_SLOT_START_DELAY_TIME, delay);
|
||||
}
|
||||
}
|
||||
|
||||
let mut indices = custody_columns.iter().map(|d| d.index).collect::<Vec<_>>();
|
||||
indices.sort_unstable();
|
||||
debug!(
|
||||
self.log,
|
||||
"RPC custody data columns received";
|
||||
"indices" => ?indices,
|
||||
"block_root" => %block_root,
|
||||
"slot" => %slot,
|
||||
);
|
||||
|
||||
let mut result = self
|
||||
.chain
|
||||
.process_rpc_custody_columns(custody_columns)
|
||||
|
||||
@@ -479,8 +479,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
// continue_request will send for processing as the request state is AwaitingProcessing
|
||||
}
|
||||
Err(e) => {
|
||||
// TODO(das): is it okay to not log the peer source of request failures? Then we
|
||||
// should log individual requests failures in the SyncNetworkContext
|
||||
// No need to log peer source here. When sending a DataColumnsByRoot request we log
|
||||
// the peer and the request ID which is linked to this `id` value here.
|
||||
debug!(self.log,
|
||||
"Received lookup download failure";
|
||||
"block_root" => ?block_root,
|
||||
|
||||
@@ -1217,12 +1217,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
requester: CustodyRequester,
|
||||
response: CustodyByRootResult<T::EthSpec>,
|
||||
) {
|
||||
// TODO(das): get proper timestamp
|
||||
let seen_timestamp = timestamp_now();
|
||||
self.block_lookups
|
||||
.on_download_response::<CustodyRequestState<T::EthSpec>>(
|
||||
requester.0,
|
||||
response.map(|(columns, peer_group)| (columns, peer_group, seen_timestamp)),
|
||||
response,
|
||||
&mut self.network,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -68,7 +68,9 @@ impl<T> RpcEvent<T> {
|
||||
|
||||
pub type RpcResponseResult<T> = Result<(T, Duration), RpcResponseError>;
|
||||
|
||||
pub type CustodyByRootResult<T> = Result<(DataColumnSidecarList<T>, PeerGroup), RpcResponseError>;
|
||||
/// Duration = latest seen timestamp of all received data columns
|
||||
pub type CustodyByRootResult<T> =
|
||||
Result<(DataColumnSidecarList<T>, PeerGroup, Duration), RpcResponseError>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum RpcResponseError {
|
||||
@@ -1190,7 +1192,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
// Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to
|
||||
// an Option first to use in an `if let Some() { act on result }` block.
|
||||
match result.as_ref() {
|
||||
Some(Ok((columns, peer_group))) => {
|
||||
Some(Ok((columns, peer_group, _))) => {
|
||||
debug!(self.log, "Custody request success, removing"; "id" => ?id, "count" => columns.len(), "peers" => ?peer_group)
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
@@ -1208,7 +1210,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
id: Id,
|
||||
block_root: Hash256,
|
||||
block: RpcBlock<T::EthSpec>,
|
||||
duration: Duration,
|
||||
seen_timestamp: Duration,
|
||||
) -> Result<(), SendErrorProcessor> {
|
||||
let beacon_processor = self
|
||||
.beacon_processor_if_enabled()
|
||||
@@ -1221,7 +1223,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.send_rpc_beacon_block(
|
||||
block_root,
|
||||
block,
|
||||
duration,
|
||||
seen_timestamp,
|
||||
BlockProcessType::SingleBlock { id },
|
||||
)
|
||||
.map_err(|e| {
|
||||
@@ -1239,7 +1241,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
id: Id,
|
||||
block_root: Hash256,
|
||||
blobs: FixedBlobSidecarList<T::EthSpec>,
|
||||
duration: Duration,
|
||||
seen_timestamp: Duration,
|
||||
) -> Result<(), SendErrorProcessor> {
|
||||
let beacon_processor = self
|
||||
.beacon_processor_if_enabled()
|
||||
@@ -1252,7 +1254,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.send_rpc_blobs(
|
||||
block_root,
|
||||
blobs,
|
||||
duration,
|
||||
seen_timestamp,
|
||||
BlockProcessType::SingleBlob { id },
|
||||
)
|
||||
.map_err(|e| {
|
||||
@@ -1270,7 +1272,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
_id: Id,
|
||||
block_root: Hash256,
|
||||
custody_columns: DataColumnSidecarList<T::EthSpec>,
|
||||
duration: Duration,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) -> Result<(), SendErrorProcessor> {
|
||||
let beacon_processor = self
|
||||
@@ -1280,7 +1282,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
debug!(self.log, "Sending custody columns for processing"; "block" => ?block_root, "process_type" => ?process_type);
|
||||
|
||||
beacon_processor
|
||||
.send_rpc_custody_columns(block_root, custody_columns, duration, process_type)
|
||||
.send_rpc_custody_columns(block_root, custody_columns, seen_timestamp, process_type)
|
||||
.map_err(|e| {
|
||||
error!(
|
||||
self.log,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::sync::network_context::{
|
||||
DataColumnsByRootRequestId, DataColumnsByRootSingleBlockRequest,
|
||||
};
|
||||
|
||||
use beacon_chain::validator_monitor::timestamp_now;
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester};
|
||||
@@ -61,7 +61,8 @@ struct ActiveBatchColumnsRequest {
|
||||
indices: Vec<ColumnIndex>,
|
||||
}
|
||||
|
||||
pub type CustodyRequestResult<E> = Result<Option<(DataColumnSidecarList<E>, PeerGroup)>, Error>;
|
||||
pub type CustodyRequestResult<E> =
|
||||
Result<Option<(DataColumnSidecarList<E>, PeerGroup, Duration)>, Error>;
|
||||
|
||||
impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
|
||||
pub(crate) fn new(
|
||||
@@ -102,8 +103,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
|
||||
resp: RpcResponseResult<DataColumnSidecarList<T::EthSpec>>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> CustodyRequestResult<T::EthSpec> {
|
||||
// TODO(das): Should downscore peers for verify errors here
|
||||
|
||||
let Some(batch_request) = self.active_batch_columns_requests.get_mut(&req_id) else {
|
||||
warn!(self.log,
|
||||
"Received custody column response for unrequested index";
|
||||
@@ -115,7 +114,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
|
||||
};
|
||||
|
||||
match resp {
|
||||
Ok((data_columns, _seen_timestamp)) => {
|
||||
Ok((data_columns, seen_timestamp)) => {
|
||||
debug!(self.log,
|
||||
"Custody column download success";
|
||||
"id" => ?self.custody_id,
|
||||
@@ -141,7 +140,12 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
|
||||
.ok_or(Error::BadState("unknown column_index".to_owned()))?;
|
||||
|
||||
if let Some(data_column) = data_columns.remove(column_index) {
|
||||
column_request.on_download_success(req_id, peer_id, data_column)?;
|
||||
column_request.on_download_success(
|
||||
req_id,
|
||||
peer_id,
|
||||
data_column,
|
||||
seen_timestamp,
|
||||
)?;
|
||||
} else {
|
||||
// Peer does not have the requested data.
|
||||
// TODO(das) do not consider this case a success. We know for sure the block has
|
||||
@@ -204,20 +208,23 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
|
||||
if self.column_requests.values().all(|r| r.is_downloaded()) {
|
||||
// All requests have completed successfully.
|
||||
let mut peers = HashMap::<PeerId, Vec<usize>>::new();
|
||||
let mut seen_timestamps = vec![];
|
||||
let columns = std::mem::take(&mut self.column_requests)
|
||||
.into_values()
|
||||
.map(|request| {
|
||||
let (peer, data_column) = request.complete()?;
|
||||
let (peer, data_column, seen_timestamp) = request.complete()?;
|
||||
peers
|
||||
.entry(peer)
|
||||
.or_default()
|
||||
.push(data_column.index as usize);
|
||||
seen_timestamps.push(seen_timestamp);
|
||||
Ok(data_column)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let peer_group = PeerGroup::from_set(peers);
|
||||
return Ok(Some((columns, peer_group)));
|
||||
let max_seen_timestamp = seen_timestamps.into_iter().max().unwrap_or(timestamp_now());
|
||||
return Ok(Some((columns, peer_group, max_seen_timestamp)));
|
||||
}
|
||||
|
||||
let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
|
||||
@@ -335,7 +342,7 @@ struct ColumnRequest<E: EthSpec> {
|
||||
enum Status<E: EthSpec> {
|
||||
NotStarted(Instant),
|
||||
Downloading(DataColumnsByRootRequestId),
|
||||
Downloaded(PeerId, Arc<DataColumnSidecar<E>>),
|
||||
Downloaded(PeerId, Arc<DataColumnSidecar<E>>, Duration),
|
||||
}
|
||||
|
||||
impl<E: EthSpec> ColumnRequest<E> {
|
||||
@@ -404,6 +411,7 @@ impl<E: EthSpec> ColumnRequest<E> {
|
||||
req_id: DataColumnsByRootRequestId,
|
||||
peer_id: PeerId,
|
||||
data_column: Arc<DataColumnSidecar<E>>,
|
||||
seen_timestamp: Duration,
|
||||
) -> Result<(), Error> {
|
||||
match &self.status {
|
||||
Status::Downloading(expected_req_id) => {
|
||||
@@ -413,7 +421,7 @@ impl<E: EthSpec> ColumnRequest<E> {
|
||||
req_id,
|
||||
});
|
||||
}
|
||||
self.status = Status::Downloaded(peer_id, data_column);
|
||||
self.status = Status::Downloaded(peer_id, data_column, seen_timestamp);
|
||||
Ok(())
|
||||
}
|
||||
other => Err(Error::BadState(format!(
|
||||
@@ -422,9 +430,11 @@ impl<E: EthSpec> ColumnRequest<E> {
|
||||
}
|
||||
}
|
||||
|
||||
fn complete(self) -> Result<(PeerId, Arc<DataColumnSidecar<E>>), Error> {
|
||||
fn complete(self) -> Result<(PeerId, Arc<DataColumnSidecar<E>>, Duration), Error> {
|
||||
match self.status {
|
||||
Status::Downloaded(peer_id, data_column) => Ok((peer_id, data_column)),
|
||||
Status::Downloaded(peer_id, data_column, seen_timestamp) => {
|
||||
Ok((peer_id, data_column, seen_timestamp))
|
||||
}
|
||||
other => Err(Error::BadState(format!(
|
||||
"bad state complete expected Downloaded got {other:?}"
|
||||
))),
|
||||
|
||||
@@ -713,7 +713,6 @@ impl TestRig {
|
||||
self.complete_data_columns_by_root_request(id, data_columns);
|
||||
|
||||
// Expect work event
|
||||
// TODO(das): worth it to append sender id to the work event for stricter assertion?
|
||||
self.expect_rpc_sample_verify_work_event();
|
||||
|
||||
// Respond with valid result
|
||||
@@ -755,7 +754,6 @@ impl TestRig {
|
||||
}
|
||||
|
||||
// Expect work event
|
||||
// TODO(das): worth it to append sender id to the work event for stricter assertion?
|
||||
self.expect_rpc_custody_column_work_event();
|
||||
|
||||
// Respond with valid result
|
||||
|
||||
@@ -17,6 +17,8 @@ pub enum DataColumnCustodyGroupError {
|
||||
/// The `get_custody_groups` function is used to determine the custody groups that a node is
|
||||
/// assigned to.
|
||||
///
|
||||
/// Note: `get_custody_groups(node_id, x)` is a subset of `get_custody_groups(node_id, y)` if `x < y`.
|
||||
///
|
||||
/// spec: https://github.com/ethereum/consensus-specs/blob/8e0d0d48e81d6c7c5a8253ab61340f5ea5bac66a/specs/fulu/das-core.md#get_custody_groups
|
||||
pub fn get_custody_groups(
|
||||
raw_node_id: [u8; 32],
|
||||
|
||||
Reference in New Issue
Block a user