Resolve merge conflicts

This commit is contained in:
Eitan Seri- Levi
2026-03-16 02:24:50 -07:00
270 changed files with 16594 additions and 8798 deletions

View File

@@ -17,7 +17,8 @@ use crate::sync::block_lookups::SingleLookupId;
use crate::sync::block_sidecar_coupling::CouplingError;
use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest;
use crate::sync::range_data_column_batch_request::RangeDataColumnBatchRequest;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::block_verification_types::LookupBlock;
use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState};
use custody::CustodyRequestResult;
use fnv::FnvHashMap;
@@ -735,7 +736,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
id: ComponentsByRangeRequestId,
range_block_component: RangeBlockComponent<T::EthSpec>,
) -> Option<Result<Vec<RpcBlock<T::EthSpec>>, RpcResponseError>> {
) -> Option<Result<Vec<RangeSyncBlock<T::EthSpec>>, RpcResponseError>> {
let Entry::Occupied(mut entry) = self.components_by_range_requests.entry(id) else {
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["range_blocks"]);
return None;
@@ -1096,13 +1097,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
})?;
// Include only the blob indexes not yet imported (received through gossip)
let custody_indexes_to_fetch = self
let mut custody_indexes_to_fetch = self
.chain
.sampling_columns_for_epoch(current_epoch)
.iter()
.copied()
.filter(|index| !custody_indexes_imported.contains(index))
.collect::<Vec<_>>();
custody_indexes_to_fetch.sort_unstable();
if custody_indexes_to_fetch.is_empty() {
// No indexes required, do not issue any request
@@ -1431,7 +1433,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
})
});
self.on_rpc_response_result(id, "BlocksByRoot", resp, peer_id, |_| 1)
self.on_rpc_response_result(resp, peer_id)
}
pub(crate) fn on_single_blob_response(
@@ -1460,7 +1462,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
})
});
self.on_rpc_response_result(id, "BlobsByRoot", resp, peer_id, |_| 1)
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
@@ -1473,7 +1475,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let resp = self
.data_columns_by_root_requests
.on_response(id, rpc_event);
self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1)
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
@@ -1484,7 +1486,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<SignedBeaconBlock<T::EthSpec>>>>> {
let resp = self.blocks_by_range_requests.on_response(id, rpc_event);
self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len())
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
@@ -1495,7 +1497,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<BlobSidecar<T::EthSpec>>>>> {
let resp = self.blobs_by_range_requests.on_response(id, rpc_event);
self.on_rpc_response_result(id, "BlobsByRangeRequest", resp, peer_id, |b| b.len())
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
@@ -1508,36 +1510,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let resp = self
.data_columns_by_range_requests
.on_response(id, rpc_event);
self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len())
self.on_rpc_response_result(resp, peer_id)
}
fn on_rpc_response_result<I: std::fmt::Display, R, F: FnOnce(&R) -> usize>(
/// Common handler for consistent scoring of RpcResponseError
fn on_rpc_response_result<R>(
&mut self,
id: I,
method: &'static str,
resp: Option<RpcResponseResult<R>>,
peer_id: PeerId,
get_count: F,
) -> Option<RpcResponseResult<R>> {
match &resp {
None => {}
Some(Ok((v, _))) => {
debug!(
%id,
method,
count = get_count(v),
"Sync RPC request completed"
);
}
Some(Err(e)) => {
debug!(
%id,
method,
error = ?e,
"Sync RPC request error"
);
}
}
if let Some(Err(RpcResponseError::VerifyError(e))) = &resp {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
@@ -1609,21 +1590,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.beacon_processor_if_enabled()
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
let block = RpcBlock::new(
block,
None,
self.chain.data_availability_checker.v1(),
self.chain.spec.clone(),
)
.map_err(|_| SendErrorProcessor::SendError)?;
let lookup_block = LookupBlock::new(block);
debug!(block = ?block_root, id, "Sending block for processing");
debug!(block = ?block_root, block_slot = %lookup_block.slot(), id, "Sending block for processing");
// Lookup sync event safety: If `beacon_processor.send_rpc_beacon_block` returns Ok() sync
// must receive a single `SyncMessage::BlockComponentProcessed` with this process type
beacon_processor
.send_rpc_beacon_block(
.send_lookup_beacon_block(
block_root,
block,
lookup_block,
seen_timestamp,
BlockProcessType::SingleBlock { id },
)