Resolve merge conflicts

This commit is contained in:
Eitan Seri- Levi
2026-01-27 22:46:46 -08:00
67 changed files with 2383 additions and 1048 deletions

View File

@@ -606,7 +606,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
parent = None,
level = "debug",
skip_all,
fields(slot = %column_sidecar.slot(), block_root = ?column_sidecar.block_root(), index = column_sidecar.index),
fields(slot = %column_sidecar.slot(), block_root = ?column_sidecar.block_root(), index = column_sidecar.index()),
)]
pub async fn process_gossip_data_column_sidecar(
self: &Arc<Self>,
@@ -618,7 +618,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) {
let slot = column_sidecar.slot();
let block_root = column_sidecar.block_root();
let index = column_sidecar.index;
let index = *column_sidecar.index();
let delay = get_slot_delay_ms(seen_duration, slot, &self.chain.slot_clock);
// Log metrics to track delay from other nodes on the network.
metrics::observe_duration(
@@ -664,6 +664,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
Err(err) => {
match err {
GossipDataColumnError::InvalidVariant => {
// TODO(gloas) we should probably penalize the peer here
debug!(
%slot,
%block_root,
%index,
"Invalid gossip data column variant."
)
}
GossipDataColumnError::PriorKnownUnpublished => {
debug!(
%slot,

View File

@@ -980,7 +980,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.into_iter()
.map(|d| {
let subnet =
DataColumnSubnetId::from_column_index(d.index, &chain.spec);
DataColumnSubnetId::from_column_index(*d.index(), &chain.spec);
PubsubMessage::DataColumnSidecar(Box::new((subnet, d)))
})
.collect(),

View File

@@ -748,7 +748,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
)
.ok_or((RpcErrorResponse::ServerError, "shutting down"))?
.await
.map_err(|_| (RpcErrorResponse::ServerError, "tokio join"))??;
.map_err(|_| (RpcErrorResponse::ServerError, "tokio join"))??
.iter()
.map(|(root, _)| *root)
.collect::<Vec<_>>();
let current_slot = self
.chain
@@ -861,7 +864,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
req_start_slot: u64,
req_count: u64,
req_type: &str,
) -> Result<Vec<Hash256>, (RpcErrorResponse, &'static str)> {
) -> Result<Vec<(Hash256, Slot)>, (RpcErrorResponse, &'static str)> {
let start_time = std::time::Instant::now();
let finalized_slot = self
.chain
@@ -871,7 +874,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.epoch
.start_slot(T::EthSpec::slots_per_epoch());
let (block_roots, source) = if req_start_slot >= finalized_slot.as_u64() {
let (block_roots_and_slots, source) = if req_start_slot >= finalized_slot.as_u64() {
// If the entire requested range is after finalization, use fork_choice
(
self.chain
@@ -915,14 +918,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
req_type,
start_slot = %req_start_slot,
req_count,
roots_count = block_roots.len(),
roots_count = block_roots_and_slots.len(),
source,
elapsed = ?elapsed,
%finalized_slot,
"Range request block roots retrieved"
);
Ok(block_roots)
Ok(block_roots_and_slots)
}
/// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator.
@@ -930,7 +933,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
&self,
start_slot: u64,
count: u64,
) -> Result<Vec<Hash256>, (RpcErrorResponse, &'static str)> {
) -> Result<Vec<(Hash256, Slot)>, (RpcErrorResponse, &'static str)> {
let forwards_block_root_iter =
match self.chain.forwards_iter_block_roots(Slot::from(start_slot)) {
Ok(iter) => iter,
@@ -976,11 +979,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};
// remove all skip slots i.e. duplicated roots
Ok(block_roots
.into_iter()
.map(|(root, _)| root)
.unique()
.collect::<Vec<_>>())
Ok(block_roots.into_iter().unique().collect::<Vec<_>>())
}
/// Handle a `BlobsByRange` request from the peer.
@@ -1087,7 +1086,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};
}
let block_roots =
let block_roots_and_slots =
self.get_block_roots_for_slot_range(req.start_slot, effective_count, "BlobsByRange")?;
let current_slot = self
@@ -1108,7 +1107,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let mut blobs_sent = 0;
for root in block_roots {
for (root, _) in block_roots_and_slots {
match self.chain.get_blobs(&root) {
Ok(blob_sidecar_list) => {
for blob_sidecar in blob_sidecar_list.iter() {
@@ -1247,7 +1246,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};
}
let block_roots =
let block_roots_and_slots =
self.get_block_roots_for_slot_range(req.start_slot, req.count, "DataColumnsByRange")?;
let mut data_columns_sent = 0;
@@ -1264,9 +1263,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.filter(|c| available_columns.contains(c))
.collect::<Vec<_>>();
for root in block_roots {
for (root, slot) in block_roots_and_slots {
let fork_name = self.chain.spec.fork_name_at_slot::<T::EthSpec>(slot);
for index in &indices_to_retrieve {
match self.chain.get_data_column(&root, index) {
match self.chain.get_data_column(&root, index, fork_name) {
Ok(Some(data_column_sidecar)) => {
// Due to skip slots, data columns could be out of the range, we ensure they
// are in the range before sending

View File

@@ -369,7 +369,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
metrics::observe_duration(&metrics::BEACON_BLOB_RPC_SLOT_START_DELAY_TIME, delay);
}
let mut indices = custody_columns.iter().map(|d| d.index).collect::<Vec<_>>();
let mut indices = custody_columns
.iter()
.map(|d| *d.index())
.collect::<Vec<_>>();
indices.sort_unstable();
debug!(
?indices,

View File

@@ -10,7 +10,7 @@ use crate::{
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::custody_context::NodeCustodyType;
use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip;
use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip_fulu;
use beacon_chain::kzg_utils::blobs_to_data_column_sidecars;
use beacon_chain::observed_data_sidecars::DoNotObserve;
use beacon_chain::test_utils::{
@@ -39,12 +39,14 @@ use std::iter::Iterator;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::data::{BlobIdentifier, FixedBlobSidecarList};
use types::{
AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList,
DataColumnSubnetId, Epoch, EthSpec, Hash256, MainnetEthSpec, ProposerSlashing,
SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot,
SubnetId,
AttesterSlashing, BlobSidecar, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch,
EthSpec, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
SignedVoluntaryExit, SingleAttestation, Slot, SubnetId,
};
use types::{
BlobSidecarList,
data::{BlobIdentifier, FixedBlobSidecarList},
};
type E = MainnetEthSpec;
@@ -309,7 +311,7 @@ impl TestRig {
)
.unwrap()
.into_iter()
.filter(|c| sampling_indices.contains(&c.index))
.filter(|c| sampling_indices.contains(c.index()))
.collect::<Vec<_>>();
(None, Some(custody_columns))
@@ -386,7 +388,7 @@ impl TestRig {
.send_gossip_data_column_sidecar(
junk_message_id(),
junk_peer_id(),
DataColumnSubnetId::from_column_index(data_column.index, &self.chain.spec),
DataColumnSubnetId::from_column_index(*data_column.index(), &self.chain.spec),
data_column.clone(),
Duration::from_secs(0),
)
@@ -1115,8 +1117,8 @@ async fn accept_processed_gossip_data_columns_without_import() {
.into_iter()
.map(|data_column| {
let subnet_id =
DataColumnSubnetId::from_column_index(data_column.index, &rig.chain.spec);
validate_data_column_sidecar_for_gossip::<_, DoNotObserve>(
DataColumnSubnetId::from_column_index(*data_column.index(), &rig.chain.spec);
validate_data_column_sidecar_for_gossip_fulu::<_, DoNotObserve>(
data_column,
subnet_id,
&rig.chain,
@@ -1948,7 +1950,7 @@ async fn test_data_columns_by_range_request_only_returns_requested_columns() {
} = next
{
if let Some(column) = data_column {
received_columns.push(column.index);
received_columns.push(*column.index());
} else {
break;
}

View File

@@ -88,7 +88,11 @@ impl<E: EthSpec> BlockComponent<E> {
match self {
BlockComponent::Block(block) => block.value.parent_root(),
BlockComponent::Blob(blob) => blob.value.block_parent_root(),
BlockComponent::DataColumn(column) => column.value.block_parent_root(),
BlockComponent::DataColumn(column) => match column.value.as_ref() {
DataColumnSidecar::Fulu(column) => column.block_parent_root(),
// TODO(gloas) we don't have a parent root post gloas, not sure what to do here
DataColumnSidecar::Gloas(column) => column.beacon_block_root,
},
}
}
fn get_type(&self) -> &'static str {

View File

@@ -346,7 +346,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
for column in data_columns {
let block_root = column.block_root();
let index = column.index;
let index = *column.index();
if data_columns_by_block
.entry(block_root)
.or_default()
@@ -624,7 +624,7 @@ mod tests {
*req,
blocks
.iter()
.flat_map(|b| b.1.iter().filter(|d| d.index == column_index).cloned())
.flat_map(|b| b.1.iter().filter(|d| *d.index() == column_index).cloned())
.collect(),
)
.unwrap();
@@ -707,7 +707,7 @@ mod tests {
.iter()
.flat_map(|b| {
b.1.iter()
.filter(|d| column_indices.contains(&d.index))
.filter(|d| column_indices.contains(d.index()))
.cloned()
})
.collect::<Vec<_>>(),
@@ -779,7 +779,7 @@ mod tests {
*req,
blocks
.iter()
.flat_map(|b| b.1.iter().filter(|d| d.index == column_index).cloned())
.flat_map(|b| b.1.iter().filter(|d| *d.index() == column_index).cloned())
.collect(),
)
.unwrap();
@@ -864,7 +864,7 @@ mod tests {
*req1,
blocks
.iter()
.flat_map(|b| b.1.iter().filter(|d| d.index == 1).cloned())
.flat_map(|b| b.1.iter().filter(|d| *d.index() == 1).cloned())
.collect(),
)
.unwrap();
@@ -891,7 +891,7 @@ mod tests {
new_columns_req_id,
blocks
.iter()
.flat_map(|b| b.1.iter().filter(|d| d.index == 2).cloned())
.flat_map(|b| b.1.iter().filter(|d| *d.index() == 2).cloned())
.collect(),
)
.unwrap();
@@ -957,7 +957,7 @@ mod tests {
*req1,
blocks
.iter()
.flat_map(|b| b.1.iter().filter(|d| d.index == 1).cloned())
.flat_map(|b| b.1.iter().filter(|d| *d.index() == 1).cloned())
.collect(),
)
.unwrap();

View File

@@ -871,20 +871,28 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncMessage::UnknownParentDataColumn(peer_id, data_column) => {
let data_column_slot = data_column.slot();
let block_root = data_column.block_root();
let parent_root = data_column.block_parent_root();
debug!(%block_root, %parent_root, "Received unknown parent data column message");
self.handle_unknown_parent(
peer_id,
block_root,
parent_root,
data_column_slot,
BlockComponent::DataColumn(DownloadResult {
value: data_column,
block_root,
seen_timestamp: timestamp_now(),
peer_group: PeerGroup::from_single(peer_id),
}),
);
match data_column.as_ref() {
DataColumnSidecar::Fulu(column) => {
let parent_root = column.block_parent_root();
debug!(%block_root, %parent_root, "Received unknown parent data column message");
self.handle_unknown_parent(
peer_id,
block_root,
parent_root,
data_column_slot,
BlockComponent::DataColumn(DownloadResult {
value: data_column,
block_root,
seen_timestamp: timestamp_now(),
peer_group: PeerGroup::from_single(peer_id),
}),
);
}
// TODO(gloas) support gloas data column variant
DataColumnSidecar::Gloas(_) => {
error!("Gloas variant not yet supported")
}
}
}
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_root) => {
if !self.notified_unknown_roots.contains(&(peer_id, block_root)) {

View File

@@ -127,7 +127,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
// requested index. The worse case is 128 loops over a 128 item vec + mutation to
// drop the consumed columns.
let mut data_columns = HashMap::<ColumnIndex, _>::from_iter(
data_columns.into_iter().map(|d| (d.index, d)),
data_columns.into_iter().map(|d| (*d.index(), d)),
);
// Accumulate columns that the peer does not have to issue a single log per request
let mut missing_column_indexes = vec![];
@@ -209,7 +209,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
peers
.entry(peer)
.or_default()
.push(data_column.index as usize);
.push(*data_column.index() as usize);
seen_timestamps.push(seen_timestamp);
Ok(data_column)
})

View File

@@ -28,18 +28,22 @@ impl<E: EthSpec> ActiveRequestItems for DataColumnsByRangeRequestItems<E> {
{
return Err(LookupVerifyError::UnrequestedSlot(data_column.slot()));
}
if !self.request.columns.contains(&data_column.index) {
return Err(LookupVerifyError::UnrequestedIndex(data_column.index));
if !self.request.columns.contains(data_column.index()) {
return Err(LookupVerifyError::UnrequestedIndex(*data_column.index()));
}
if !data_column.verify_inclusion_proof() {
if let DataColumnSidecar::Fulu(data_column) = data_column.as_ref()
&& !data_column.verify_inclusion_proof()
{
return Err(LookupVerifyError::InvalidInclusionProof);
}
if self.items.iter().any(|existing| {
existing.slot() == data_column.slot() && existing.index == data_column.index
existing.slot() == data_column.slot() && *existing.index() == *data_column.index()
}) {
return Err(LookupVerifyError::DuplicatedData(
data_column.slot(),
data_column.index,
*data_column.index(),
));
}

View File

@@ -57,16 +57,24 @@ impl<E: EthSpec> ActiveRequestItems for DataColumnsByRootRequestItems<E> {
if self.request.block_root != block_root {
return Err(LookupVerifyError::UnrequestedBlockRoot(block_root));
}
if !data_column.verify_inclusion_proof() {
if let DataColumnSidecar::Fulu(data_column) = data_column.as_ref()
&& !data_column.verify_inclusion_proof()
{
return Err(LookupVerifyError::InvalidInclusionProof);
}
if !self.request.indices.contains(&data_column.index) {
return Err(LookupVerifyError::UnrequestedIndex(data_column.index));
if !self.request.indices.contains(data_column.index()) {
return Err(LookupVerifyError::UnrequestedIndex(*data_column.index()));
}
if self.items.iter().any(|d| d.index == data_column.index) {
if self
.items
.iter()
.any(|d| *d.index() == *data_column.index())
{
return Err(LookupVerifyError::DuplicatedData(
data_column.slot(),
data_column.index,
*data_column.index(),
));
}

View File

@@ -189,9 +189,15 @@ impl<T: BeaconChainTypes> RangeDataColumnBatchRequest<T> {
.unique()
.collect::<Vec<_>>();
// TODO(gloas) no block signatures to check post-gloas, double check what to do here
let column_block_signatures = columns
.iter()
.map(|column| column.signed_block_header.signature.clone())
.filter_map(|column| match column.as_ref() {
DataColumnSidecar::Fulu(column) => {
Some(column.signed_block_header.signature.clone())
}
_ => None,
})
.unique()
.collect::<Vec<_>>();
@@ -201,8 +207,8 @@ impl<T: BeaconChainTypes> RangeDataColumnBatchRequest<T> {
// If there are no block roots, penalize all peers
[] => {
for column in &columns {
if let Some(naughty_peer) = column_to_peer.get(&column.index) {
naughty_peers.push((column.index, *naughty_peer));
if let Some(naughty_peer) = column_to_peer.get(column.index()) {
naughty_peers.push((*column.index(), *naughty_peer));
}
}
continue;
@@ -212,9 +218,9 @@ impl<T: BeaconChainTypes> RangeDataColumnBatchRequest<T> {
for column in columns {
if column_block_roots.contains(&column.block_root())
&& block_root != column.block_root()
&& let Some(naughty_peer) = column_to_peer.get(&column.index)
&& let Some(naughty_peer) = column_to_peer.get(column.index())
{
naughty_peers.push((column.index, *naughty_peer));
naughty_peers.push((*column.index(), *naughty_peer));
}
}
continue;
@@ -227,17 +233,19 @@ impl<T: BeaconChainTypes> RangeDataColumnBatchRequest<T> {
// If there are no block signatures, penalize all peers
[] => {
for column in &columns {
if let Some(naughty_peer) = column_to_peer.get(&column.index) {
naughty_peers.push((column.index, *naughty_peer));
if let Some(naughty_peer) = column_to_peer.get(column.index()) {
naughty_peers.push((*column.index(), *naughty_peer));
}
}
continue;
}
// If theres more than one unique block signature, penalize the peers serving the
// invalid block signatures.
// invalid block signatures. This check is only relevant for Fulu.
column_block_signatures => {
for column in columns {
if column_block_signatures.contains(&column.signed_block_header.signature)
if let DataColumnSidecar::Fulu(column) = column.as_ref()
&& column_block_signatures
.contains(&column.signed_block_header.signature)
&& block.signature() != &column.signed_block_header.signature
&& let Some(naughty_peer) = column_to_peer.get(&column.index)
{
@@ -251,8 +259,8 @@ impl<T: BeaconChainTypes> RangeDataColumnBatchRequest<T> {
// if the block root doesn't match the columns block root, penalize the peers
if block_root != column_block_root {
for column in &columns {
if let Some(naughty_peer) = column_to_peer.get(&column.index) {
naughty_peers.push((column.index, *naughty_peer));
if let Some(naughty_peer) = column_to_peer.get(column.index()) {
naughty_peers.push((*column.index(), *naughty_peer));
}
}
}
@@ -260,13 +268,13 @@ impl<T: BeaconChainTypes> RangeDataColumnBatchRequest<T> {
// If the block signature doesn't match the columns block signature, penalize the peers
if block.signature() != column_block_signature {
for column in &columns {
if let Some(naughty_peer) = column_to_peer.get(&column.index) {
naughty_peers.push((column.index, *naughty_peer));
if let Some(naughty_peer) = column_to_peer.get(column.index()) {
naughty_peers.push((*column.index(), *naughty_peer));
}
}
}
let received_columns = columns.iter().map(|c| c.index).collect::<HashSet<_>>();
let received_columns = columns.iter().map(|c| *c.index()).collect::<HashSet<_>>();
let missing_columns = expected_custody_columns
.difference(&received_columns)

View File

@@ -393,7 +393,7 @@ impl TestRig {
let data_sidecars = if fork.fulu_enabled() {
store
.get_data_columns(&block_root)
.get_data_columns(&block_root, fork)
.unwrap()
.map(|columns| {
columns