Remove peer sampling code (#7768)

Peer sampling has been completely removed from the spec. This PR removes our partial implementation from the codebase.
https://github.com/ethereum/consensus-specs/pull/4393
This commit is contained in:
Jimmy Chen
2025-07-23 13:24:45 +10:00
committed by GitHub
parent c4b973f5ba
commit 4daa015971
17 changed files with 11 additions and 1509 deletions

View File

@@ -38,7 +38,6 @@ use super::block_lookups::BlockLookups;
use super::network_context::{
CustodyByRootResult, RangeBlockComponent, RangeRequestId, RpcEvent, SyncNetworkContext,
};
use super::peer_sampling::{Sampling, SamplingConfig, SamplingResult};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor};
@@ -58,7 +57,7 @@ use lighthouse_network::rpc::RPCError;
use lighthouse_network::service::api_types::{
BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyRequester,
DataColumnsByRangeRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id,
SamplingId, SamplingRequester, SingleLookupReqId, SyncRequestId,
SingleLookupReqId, SyncRequestId,
};
use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::SyncInfo;
@@ -69,14 +68,11 @@ use std::ops::Sub;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use tracing::{debug, error, info, info_span, trace, Instrument};
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot,
};
#[cfg(test)]
use types::ColumnIndex;
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a
/// fully sync'd peer.
@@ -146,10 +142,6 @@ pub enum SyncMessage<E: EthSpec> {
/// manager to attempt to find the block matching the unknown hash.
UnknownBlockHashFromAttestation(PeerId, Hash256),
/// Request to start sampling a block. Caller should ensure that block has data before sending
/// the request.
SampleBlock(Hash256, Slot),
/// A peer has disconnected.
Disconnect(PeerId),
@@ -172,12 +164,6 @@ pub enum SyncMessage<E: EthSpec> {
result: BlockProcessingResult,
},
/// Sample data column verified
SampleVerified {
id: SamplingId,
result: Result<(), String>,
},
/// A block from gossip has completed processing,
GossipBlockProcessResult { block_root: Hash256, imported: bool },
}
@@ -248,8 +234,6 @@ pub struct SyncManager<T: BeaconChainTypes> {
/// may forward us thousands of a attestations, each one triggering an individual event. Only
/// one event is useful, the rest generating log noise and wasted cycles
notified_unknown_roots: LRUTimeCache<(PeerId, Hash256)>,
sampling: Sampling<T>,
}
/// Spawns a new `SyncManager` thread which has a weak reference to underlying beacon
@@ -274,7 +258,6 @@ pub fn spawn<T: BeaconChainTypes>(
network_send,
beacon_processor,
sync_recv,
SamplingConfig::Default,
fork_context,
);
@@ -296,7 +279,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
beacon_processor: Arc<NetworkBeaconProcessor<T>>,
sync_recv: mpsc::UnboundedReceiver<SyncMessage<T::EthSpec>>,
sampling_config: SamplingConfig,
fork_context: Arc<ForkContext>,
) -> Self {
let network_globals = beacon_processor.network_globals.clone();
@@ -315,7 +297,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
notified_unknown_roots: LRUTimeCache::new(Duration::from_secs(
NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS,
)),
sampling: Sampling::new(sampling_config),
}
}
@@ -360,20 +341,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.block_lookups.insert_failed_chain(block_root);
}
#[cfg(test)]
pub(crate) fn active_sampling_requests(&self) -> Vec<Hash256> {
self.sampling.active_sampling_requests()
}
#[cfg(test)]
pub(crate) fn get_sampling_request_status(
&self,
block_root: Hash256,
index: &ColumnIndex,
) -> Option<super::peer_sampling::Status> {
self.sampling.get_request_status(block_root, index)
}
#[cfg(test)]
pub(crate) fn update_execution_engine_state(&mut self, state: EngineState) {
self.handle_new_execution_engine_state(state);
@@ -853,15 +820,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.handle_unknown_block_root(peer_id, block_root);
}
}
SyncMessage::SampleBlock(block_root, block_slot) => {
debug!(%block_root, slot = %block_slot, "Received SampleBlock message");
if let Some((requester, result)) = self
.sampling
.on_new_sample_request(block_root, &mut self.network)
{
self.on_sampling_result(requester, result)
}
}
SyncMessage::Disconnect(peer_id) => {
debug!(%peer_id, "Received disconnected message");
self.peer_disconnect(&peer_id);
@@ -911,14 +869,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
},
SyncMessage::SampleVerified { id, result } => {
if let Some((requester, result)) =
self.sampling
.on_sample_verified(id, result, &mut self.network)
{
self.on_sampling_result(requester, result)
}
}
}
}
@@ -1175,14 +1125,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.on_data_columns_by_root_response(req_id, peer_id, data_column)
{
match req_id.requester {
DataColumnsByRootRequester::Sampling(id) => {
if let Some((requester, result)) =
self.sampling
.on_sample_downloaded(id, peer_id, resp, &mut self.network)
{
self.on_sampling_result(requester, result)
}
}
DataColumnsByRootRequester::Custody(custody_id) => {
if let Some(result) = self
.network
@@ -1256,31 +1198,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
);
}
fn on_sampling_result(&mut self, requester: SamplingRequester, result: SamplingResult) {
match requester {
SamplingRequester::ImportedBlock(block_root) => {
debug!(%block_root, ?result, "Sampling result");
match result {
Ok(_) => {
// Notify the fork-choice of a successful sampling result to mark the block
// branch as safe.
if let Err(e) = self
.network
.beacon_processor()
.send_sampling_completed(block_root)
{
warn!(?block_root, reason = ?e, "Error sending sampling result");
}
}
Err(e) => {
warn!(?block_root, reason = ?e, "Sampling failed");
}
}
}
}
}
/// Handles receiving a response for a range sync request that should have both blocks and
/// blobs.
fn on_range_components_response(

View File

@@ -6,12 +6,10 @@ mod block_lookups;
mod block_sidecar_coupling;
pub mod manager;
mod network_context;
mod peer_sampling;
mod peer_sync_info;
mod range_sync;
#[cfg(test)]
mod tests;
pub use lighthouse_network::service::api_types::SamplingId;
pub use manager::{BatchProcessResult, SyncMessage};
pub use range_sync::{BatchOperationOutcome, ChainId};

View File

@@ -1,735 +0,0 @@
use self::request::ActiveColumnSampleRequest;
#[cfg(test)]
pub(crate) use self::request::Status;
use super::network_context::{
DataColumnsByRootSingleBlockRequest, RpcResponseError, SyncNetworkContext,
};
use crate::metrics;
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::service::api_types::{
DataColumnsByRootRequester, SamplingId, SamplingRequestId, SamplingRequester,
};
use lighthouse_network::{PeerAction, PeerId};
use rand::{seq::SliceRandom, thread_rng};
use std::{
collections::hash_map::Entry, collections::HashMap, marker::PhantomData, sync::Arc,
time::Duration,
};
use tracing::{debug, error, instrument, warn};
use types::{data_column_sidecar::ColumnIndex, ChainSpec, DataColumnSidecar, Hash256};
pub type SamplingResult = Result<(), SamplingError>;
type DataColumnSidecarList<E> = Vec<Arc<DataColumnSidecar<E>>>;
pub struct Sampling<T: BeaconChainTypes> {
requests: HashMap<SamplingRequester, ActiveSamplingRequest<T>>,
sampling_config: SamplingConfig,
}
impl<T: BeaconChainTypes> Sampling<T> {
#[instrument(parent = None, fields(service = "sampling"), name = "sampling")]
pub fn new(sampling_config: SamplingConfig) -> Self {
Self {
requests: <_>::default(),
sampling_config,
}
}
#[cfg(test)]
#[instrument(parent = None,
fields(service = "sampling"),
name = "sampling",
skip_all
)]
pub fn active_sampling_requests(&self) -> Vec<Hash256> {
self.requests.values().map(|r| r.block_root).collect()
}
#[cfg(test)]
#[instrument(parent = None,
fields(service = "sampling"),
name = "sampling",
skip_all
)]
pub fn get_request_status(
&self,
block_root: Hash256,
index: &ColumnIndex,
) -> Option<self::request::Status> {
let requester = SamplingRequester::ImportedBlock(block_root);
self.requests
.get(&requester)
.and_then(|req| req.get_request_status(index))
}
/// Create a new sampling request for a known block
///
/// ### Returns
///
/// - `Some`: Request completed, won't make more progress. Expect requester to act on the result.
/// - `None`: Request still active, requester should do no action
#[instrument(parent = None,
fields(service = "sampling"),
name = "sampling",
skip_all
)]
pub fn on_new_sample_request(
&mut self,
block_root: Hash256,
cx: &mut SyncNetworkContext<T>,
) -> Option<(SamplingRequester, SamplingResult)> {
let id = SamplingRequester::ImportedBlock(block_root);
let request = match self.requests.entry(id) {
Entry::Vacant(e) => e.insert(ActiveSamplingRequest::new(
block_root,
id,
&self.sampling_config,
&cx.chain.spec,
)),
Entry::Occupied(_) => {
// Sampling is triggered from multiple sources, duplicate sampling requests are
// likely (gossip block + gossip data column)
// TODO(das): Should track failed sampling request for some time? Otherwise there's
// a risk of a loop with multiple triggers creating the request, then failing,
// and repeat.
debug!(?id, "Ignoring duplicate sampling request");
return None;
}
};
debug!(
?id,
column_selection = ?request.column_selection(),
"Created new sample request"
);
// TOOD(das): If a node has very little peers, continue_sampling() will attempt to find enough
// to sample here, immediately failing the sampling request. There should be some grace
// period to allow the peer manager to find custody peers.
let result = request.continue_sampling(cx);
self.handle_sampling_result(result, &id)
}
/// Insert a downloaded column into an active sampling request. Then make progress on the
/// entire request.
///
/// ### Returns
///
/// - `Some`: Request completed, won't make more progress. Expect requester to act on the result.
/// - `None`: Request still active, requester should do no action
#[instrument(parent = None,
fields(service = "sampling"),
name = "sampling",
skip_all
)]
pub fn on_sample_downloaded(
&mut self,
id: SamplingId,
peer_id: PeerId,
resp: Result<(DataColumnSidecarList<T::EthSpec>, Duration), RpcResponseError>,
cx: &mut SyncNetworkContext<T>,
) -> Option<(SamplingRequester, SamplingResult)> {
let Some(request) = self.requests.get_mut(&id.id) else {
// TOOD(das): This log can happen if the request is error'ed early and dropped
debug!(?id, "Sample downloaded event for unknown request");
return None;
};
let result = request.on_sample_downloaded(peer_id, id.sampling_request_id, resp, cx);
self.handle_sampling_result(result, &id.id)
}
/// Insert a downloaded column into an active sampling request. Then make progress on the
/// entire request.
///
/// ### Returns
///
/// - `Some`: Request completed, won't make more progress. Expect requester to act on the result.
/// - `None`: Request still active, requester should do no action
#[instrument(parent = None,
fields(service = "sampling"),
name = "sampling",
skip_all
)]
pub fn on_sample_verified(
&mut self,
id: SamplingId,
result: Result<(), String>,
cx: &mut SyncNetworkContext<T>,
) -> Option<(SamplingRequester, SamplingResult)> {
let Some(request) = self.requests.get_mut(&id.id) else {
// TOOD(das): This log can happen if the request is error'ed early and dropped
debug!(?id, "Sample verified event for unknown request");
return None;
};
let result = request.on_sample_verified(id.sampling_request_id, result, cx);
self.handle_sampling_result(result, &id.id)
}
/// Converts a result from the internal format of `ActiveSamplingRequest` (error first to use ?
/// conveniently), to an Option first format to use an `if let Some() { act on result }` pattern
/// in the sync manager.
#[instrument(parent = None,
fields(service = "sampling"),
name = "sampling",
skip_all
)]
fn handle_sampling_result(
&mut self,
result: Result<Option<()>, SamplingError>,
id: &SamplingRequester,
) -> Option<(SamplingRequester, SamplingResult)> {
let result = result.transpose();
if let Some(result) = result {
debug!(?id, ?result, "Sampling request completed, removing");
metrics::inc_counter_vec(
&metrics::SAMPLING_REQUEST_RESULT,
&[metrics::from_result(&result)],
);
self.requests.remove(id);
Some((*id, result))
} else {
None
}
}
}
pub struct ActiveSamplingRequest<T: BeaconChainTypes> {
block_root: Hash256,
requester_id: SamplingRequester,
column_requests: FnvHashMap<ColumnIndex, ActiveColumnSampleRequest>,
/// Mapping of column indexes for a sampling request.
column_indexes_by_sampling_request: FnvHashMap<SamplingRequestId, Vec<ColumnIndex>>,
/// Sequential ID for sampling requests.
current_sampling_request_id: SamplingRequestId,
column_shuffle: Vec<ColumnIndex>,
required_successes: Vec<usize>,
_phantom: PhantomData<T>,
}
#[derive(Debug)]
pub enum SamplingError {
SendFailed(#[allow(dead_code)] &'static str),
ProcessorUnavailable,
TooManyFailures,
BadState(#[allow(dead_code)] String),
ColumnIndexOutOfBounds,
}
/// Required success index by current failures, with p_target=5.00E-06
/// Ref: https://colab.research.google.com/drive/18uUgT2i-m3CbzQ5TyP9XFKqTn1DImUJD#scrollTo=E82ITcgB5ATh
const REQUIRED_SUCCESSES: [usize; 11] = [16, 20, 23, 26, 29, 32, 34, 37, 39, 42, 44];
#[derive(Debug, Clone)]
pub enum SamplingConfig {
Default,
#[allow(dead_code)]
Custom {
required_successes: Vec<usize>,
},
}
impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
fn new(
block_root: Hash256,
requester_id: SamplingRequester,
sampling_config: &SamplingConfig,
spec: &ChainSpec,
) -> Self {
// Select ahead of time the full list of to-sample columns
let mut column_shuffle =
(0..spec.number_of_columns as ColumnIndex).collect::<Vec<ColumnIndex>>();
let mut rng = thread_rng();
column_shuffle.shuffle(&mut rng);
Self {
block_root,
requester_id,
column_requests: <_>::default(),
column_indexes_by_sampling_request: <_>::default(),
current_sampling_request_id: SamplingRequestId(0),
column_shuffle,
required_successes: match sampling_config {
SamplingConfig::Default => REQUIRED_SUCCESSES.to_vec(),
SamplingConfig::Custom { required_successes } => required_successes.clone(),
},
_phantom: PhantomData,
}
}
#[cfg(test)]
pub fn get_request_status(&self, index: &ColumnIndex) -> Option<self::request::Status> {
self.column_requests.get(index).map(|req| req.status())
}
/// Return the current ordered list of columns that this requests has to sample to succeed
pub(crate) fn column_selection(&self) -> Vec<ColumnIndex> {
self.column_shuffle
.iter()
.take(REQUIRED_SUCCESSES[0])
.copied()
.collect()
}
/// Insert a downloaded column into an active sampling request. Then make progress on the
/// entire request.
///
/// ### Returns
///
/// - `Err`: Sampling request has failed and will be dropped
/// - `Ok(Some)`: Sampling request has successfully completed and will be dropped
/// - `Ok(None)`: Sampling request still active
pub(crate) fn on_sample_downloaded(
&mut self,
_peer_id: PeerId,
sampling_request_id: SamplingRequestId,
resp: Result<(DataColumnSidecarList<T::EthSpec>, Duration), RpcResponseError>,
cx: &mut SyncNetworkContext<T>,
) -> Result<Option<()>, SamplingError> {
// Select columns to sample
// Create individual request per column
// Progress requests
// If request fails retry or expand search
// If all good return
let Some(column_indexes) = self
.column_indexes_by_sampling_request
.get(&sampling_request_id)
else {
error!(
?sampling_request_id,
"Column indexes for the sampling request ID not found"
);
return Ok(None);
};
match resp {
Ok((mut resp_data_columns, seen_timestamp)) => {
let resp_column_indexes = resp_data_columns
.iter()
.map(|r| r.index)
.collect::<Vec<_>>();
debug!(
block_root = %self.block_root,
column_indexes = ?resp_column_indexes,
count = resp_data_columns.len(),
"Sample download success"
);
metrics::inc_counter_vec(&metrics::SAMPLE_DOWNLOAD_RESULT, &[metrics::SUCCESS]);
// Filter the data received in the response using the requested column indexes.
let mut data_columns = vec![];
for column_index in column_indexes {
let Some(request) = self.column_requests.get_mut(column_index) else {
warn!(
block_root = %self.block_root,
column_index,
"Active column sample request not found"
);
continue;
};
let Some(data_pos) = resp_data_columns
.iter()
.position(|data| &data.index == column_index)
else {
// Peer does not have the requested data, mark peer as "dont have" and try
// again with a different peer.
debug!(
block_root = %self.block_root,
column_index,
"Sampling peer claims to not have the data"
);
request.on_sampling_error()?;
continue;
};
data_columns.push(resp_data_columns.swap_remove(data_pos));
}
if !resp_data_columns.is_empty() {
let resp_column_indexes = resp_data_columns
.iter()
.map(|d| d.index)
.collect::<Vec<_>>();
debug!(
block_root = %self.block_root,
column_indexes = ?resp_column_indexes,
"Received data that was not requested"
);
}
// Handle the downloaded data columns.
if data_columns.is_empty() {
debug!(block_root = %self.block_root, "Received empty response");
self.column_indexes_by_sampling_request
.remove(&sampling_request_id);
} else {
// Overwrite `column_indexes` with the column indexes received in the response.
let column_indexes = data_columns.iter().map(|d| d.index).collect::<Vec<_>>();
self.column_indexes_by_sampling_request
.insert(sampling_request_id, column_indexes.clone());
// Peer has data column, send to verify
let Some(beacon_processor) = cx.beacon_processor_if_enabled() else {
// If processor is not available, error the entire sampling
debug!(
block = %self.block_root,
reason = "beacon processor unavailable",
"Dropping sampling"
);
return Err(SamplingError::ProcessorUnavailable);
};
debug!(
block = ?self.block_root,
?column_indexes,
"Sending data_column for verification"
);
if let Err(e) = beacon_processor.send_rpc_validate_data_columns(
self.block_root,
data_columns,
seen_timestamp,
SamplingId {
id: self.requester_id,
sampling_request_id,
},
) {
// Beacon processor is overloaded, drop sampling attempt. Failing to sample
// is not a permanent state so we should recover once the node has capacity
// and receives a descendant block.
error!(
block = %self.block_root,
reason = e.to_string(),
"Dropping sampling"
);
return Err(SamplingError::SendFailed("beacon processor send failure"));
}
}
}
Err(err) => {
debug!(
block_root = %self.block_root,
?column_indexes,
error = ?err,
"Sample download error"
);
metrics::inc_counter_vec(&metrics::SAMPLE_DOWNLOAD_RESULT, &[metrics::FAILURE]);
// Error downloading, malicious network errors are already penalized before
// reaching this function. Mark the peer as failed and try again with another.
for column_index in column_indexes {
let Some(request) = self.column_requests.get_mut(column_index) else {
warn!(
block_root = %self.block_root,
column_index,
"Active column sample request not found"
);
continue;
};
request.on_sampling_error()?;
}
}
};
self.continue_sampling(cx)
}
/// Insert a column verification result into an active sampling request. Then make progress
/// on the entire request.
///
/// ### Returns
///
/// - `Err`: Sampling request has failed and will be dropped
/// - `Ok(Some)`: Sampling request has successfully completed and will be dropped
/// - `Ok(None)`: Sampling request still active
pub(crate) fn on_sample_verified(
&mut self,
sampling_request_id: SamplingRequestId,
result: Result<(), String>,
cx: &mut SyncNetworkContext<T>,
) -> Result<Option<()>, SamplingError> {
let Some(column_indexes) = self
.column_indexes_by_sampling_request
.get(&sampling_request_id)
else {
error!(
?sampling_request_id,
"Column indexes for the sampling request ID not found"
);
return Ok(None);
};
match result {
Ok(_) => {
debug!(block_root = %self.block_root,?column_indexes, "Sample verification success");
metrics::inc_counter_vec(&metrics::SAMPLE_VERIFY_RESULT, &[metrics::SUCCESS]);
// Valid, continue_sampling will maybe consider sampling succees
for column_index in column_indexes {
let Some(request) = self.column_requests.get_mut(column_index) else {
warn!(
block_root = %self.block_root, column_index,
"Active column sample request not found"
);
continue;
};
request.on_sampling_success()?;
}
}
Err(err) => {
debug!(block_root = %self.block_root, ?column_indexes, reason = ?err, "Sample verification failure");
metrics::inc_counter_vec(&metrics::SAMPLE_VERIFY_RESULT, &[metrics::FAILURE]);
// Peer sent invalid data, penalize and try again from different peer
// TODO(das): Count individual failures
for column_index in column_indexes {
let Some(request) = self.column_requests.get_mut(column_index) else {
warn!(
block_root = %self.block_root,
column_index,
"Active column sample request not found"
);
continue;
};
let peer_id = request.on_sampling_error()?;
cx.report_peer(
peer_id,
PeerAction::LowToleranceError,
"invalid data column",
);
}
}
}
self.continue_sampling(cx)
}
pub(crate) fn continue_sampling(
&mut self,
cx: &mut SyncNetworkContext<T>,
) -> Result<Option<()>, SamplingError> {
// First check if sampling is completed, by computing `required_successes`
let mut successes = 0;
let mut failures = 0;
let mut ongoings = 0;
for request in self.column_requests.values() {
if request.is_completed() {
successes += 1;
}
if request.is_failed() {
failures += 1;
}
if request.is_ongoing() {
ongoings += 1;
}
}
// If there are too many failures, consider the sampling failed
let Some(required_successes) = self.required_successes.get(failures) else {
return Err(SamplingError::TooManyFailures);
};
// If there are enough successes, consider the sampling complete
if successes >= *required_successes {
return Ok(Some(()));
}
// First, attempt to progress sampling by requesting more columns, so that request failures
// are accounted for below.
// Group the requested column indexes by the destination peer to batch sampling requests.
let mut column_indexes_to_request = FnvHashMap::default();
for idx in 0..*required_successes {
// Re-request columns. Note: out of bounds error should never happen, inputs are hardcoded
let column_index = *self
.column_shuffle
.get(idx)
.ok_or(SamplingError::ColumnIndexOutOfBounds)?;
let request = self
.column_requests
.entry(column_index)
.or_insert(ActiveColumnSampleRequest::new(column_index));
if request.is_ready_to_request() {
if let Some(peer_id) = request.choose_peer(cx) {
let indexes = column_indexes_to_request.entry(peer_id).or_insert(vec![]);
indexes.push(column_index);
}
}
}
// Send requests.
let mut sent_request = false;
for (peer_id, column_indexes) in column_indexes_to_request {
cx.data_column_lookup_request(
DataColumnsByRootRequester::Sampling(SamplingId {
id: self.requester_id,
sampling_request_id: self.current_sampling_request_id,
}),
peer_id,
DataColumnsByRootSingleBlockRequest {
block_root: self.block_root,
indices: column_indexes.clone(),
},
// false = We issue request to custodians who may or may not have received the
// samples yet. We don't any signal (like an attestation or status messages that the
// custodian has received data).
false,
)
.map_err(SamplingError::SendFailed)?;
self.column_indexes_by_sampling_request
.insert(self.current_sampling_request_id, column_indexes.clone());
self.current_sampling_request_id.0 += 1;
sent_request = true;
// Update request status.
for column_index in column_indexes {
let Some(request) = self.column_requests.get_mut(&column_index) else {
continue;
};
request.on_start_sampling(peer_id)?;
}
}
// Make sure that sampling doesn't stall, by ensuring that this sampling request will
// receive a new event of some type. If there are no ongoing requests, and no new
// request was sent, loop to increase the required_successes until the sampling fails if
// there are no peers.
if ongoings == 0 && !sent_request {
debug!(block_root = %self.block_root, "Sampling request stalled");
}
Ok(None)
}
}
mod request {
use super::SamplingError;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::BeaconChainTypes;
use lighthouse_network::PeerId;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::collections::HashSet;
use types::data_column_sidecar::ColumnIndex;
pub(crate) struct ActiveColumnSampleRequest {
column_index: ColumnIndex,
status: Status,
// TODO(das): Should downscore peers that claim to not have the sample?
peers_dont_have: HashSet<PeerId>,
}
// Exposed only for testing assertions in lookup tests
#[derive(Debug, Clone)]
pub(crate) enum Status {
NoPeers,
NotStarted,
Sampling(PeerId),
Verified,
}
impl ActiveColumnSampleRequest {
pub(crate) fn new(column_index: ColumnIndex) -> Self {
Self {
column_index,
status: Status::NotStarted,
peers_dont_have: <_>::default(),
}
}
pub(crate) fn is_completed(&self) -> bool {
match self.status {
Status::NoPeers | Status::NotStarted | Status::Sampling(_) => false,
Status::Verified => true,
}
}
pub(crate) fn is_failed(&self) -> bool {
match self.status {
Status::NotStarted | Status::Sampling(_) | Status::Verified => false,
Status::NoPeers => true,
}
}
pub(crate) fn is_ongoing(&self) -> bool {
match self.status {
Status::NotStarted | Status::NoPeers | Status::Verified => false,
Status::Sampling(_) => true,
}
}
pub(crate) fn is_ready_to_request(&self) -> bool {
match self.status {
Status::NoPeers | Status::NotStarted => true,
Status::Sampling(_) | Status::Verified => false,
}
}
#[cfg(test)]
pub(crate) fn status(&self) -> Status {
self.status.clone()
}
pub(crate) fn choose_peer<T: BeaconChainTypes>(
&mut self,
cx: &SyncNetworkContext<T>,
) -> Option<PeerId> {
// TODO: When is a fork and only a subset of your peers know about a block, sampling should only
// be queried on the peers on that fork. Should this case be handled? How to handle it?
let mut peer_ids = cx.get_custodial_peers(self.column_index);
peer_ids.retain(|peer_id| !self.peers_dont_have.contains(peer_id));
if let Some(peer_id) = peer_ids.choose(&mut thread_rng()) {
Some(*peer_id)
} else {
self.status = Status::NoPeers;
None
}
}
pub(crate) fn on_start_sampling(&mut self, peer_id: PeerId) -> Result<(), SamplingError> {
match self.status.clone() {
Status::NoPeers | Status::NotStarted => {
self.status = Status::Sampling(peer_id);
Ok(())
}
other => Err(SamplingError::BadState(format!(
"bad state on_start_sampling expected NoPeers|NotStarted got {other:?}. column_index:{}",
self.column_index
))),
}
}
pub(crate) fn on_sampling_error(&mut self) -> Result<PeerId, SamplingError> {
match self.status.clone() {
Status::Sampling(peer_id) => {
self.peers_dont_have.insert(peer_id);
self.status = Status::NotStarted;
Ok(peer_id)
}
other => Err(SamplingError::BadState(format!(
"bad state on_sampling_error expected Sampling got {other:?}. column_index:{}",
self.column_index
))),
}
}
pub(crate) fn on_sampling_success(&mut self) -> Result<(), SamplingError> {
match &self.status {
Status::Sampling(_) => {
self.status = Status::Verified;
Ok(())
}
other => Err(SamplingError::BadState(format!(
"bad state on_sampling_success expected Sampling got {other:?}. column_index:{}",
self.column_index
))),
}
}
}
}

View File

@@ -4,8 +4,7 @@ use crate::sync::block_lookups::{
};
use crate::sync::{
manager::{BlockProcessType, BlockProcessingResult, SyncManager},
peer_sampling::SamplingConfig,
SamplingId, SyncMessage,
SyncMessage,
};
use crate::NetworkMessage;
use std::sync::Arc;
@@ -33,7 +32,7 @@ use lighthouse_network::{
rpc::{RPCError, RequestType, RpcErrorResponse},
service::api_types::{
AppRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id,
SamplingRequester, SingleLookupReqId, SyncRequestId,
SingleLookupReqId, SyncRequestId,
},
types::SyncState,
NetworkConfig, NetworkGlobals, PeerId,
@@ -50,7 +49,6 @@ use types::{
const D: Duration = Duration::new(0, 0);
const PARENT_FAIL_TOLERANCE: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS;
const SAMPLING_REQUIRED_SUCCESSES: usize = 2;
type DCByRootIds = Vec<DCByRootId>;
type DCByRootId = (SyncRequestId, Vec<ColumnIndex>);
@@ -124,9 +122,6 @@ impl TestRig {
beacon_processor.into(),
// Pass empty recv not tied to any tx
mpsc::unbounded_channel().1,
SamplingConfig::Custom {
required_successes: vec![SAMPLING_REQUIRED_SUCCESSES],
},
fork_context,
),
harness,
@@ -180,10 +175,6 @@ impl TestRig {
));
}
fn trigger_sample_block(&mut self, block_root: Hash256, block_slot: Slot) {
self.send_sync_message(SyncMessage::SampleBlock(block_root, block_slot))
}
/// Drain all sync messages in the sync_rx attached to the beacon processor
fn drain_sync_rx(&mut self) {
while let Ok(sync_message) = self.sync_rx.try_recv() {
@@ -260,27 +251,6 @@ impl TestRig {
);
}
fn expect_no_active_sampling(&mut self) {
assert_eq!(
self.sync_manager.active_sampling_requests(),
Vec::<Hash256>::new(),
"expected no active sampling"
);
}
fn expect_active_sampling(&mut self, block_root: &Hash256) {
assert!(self
.sync_manager
.active_sampling_requests()
.contains(block_root));
}
fn expect_clean_finished_sampling(&mut self) {
self.expect_empty_network();
self.expect_sampling_result_work();
self.expect_no_active_sampling();
}
fn assert_parent_lookups_count(&self, count: usize) {
assert_eq!(
self.active_parent_lookups_count(),
@@ -613,39 +583,6 @@ impl TestRig {
})
}
fn return_empty_sampling_requests(&mut self, ids: DCByRootIds) {
for id in ids {
self.log(&format!("return empty data column for {id:?}"));
self.return_empty_sampling_request(id)
}
}
fn return_empty_sampling_request(&mut self, (sync_request_id, _): DCByRootId) {
let peer_id = PeerId::random();
// Send stream termination
self.send_sync_message(SyncMessage::RpcDataColumn {
sync_request_id,
peer_id,
data_column: None,
seen_timestamp: timestamp_now(),
});
}
fn sampling_requests_failed(
&mut self,
sampling_ids: DCByRootIds,
peer_id: PeerId,
error: RPCError,
) {
for (sync_request_id, _) in sampling_ids {
self.send_sync_message(SyncMessage::RpcError {
peer_id,
sync_request_id,
error: error.clone(),
})
}
}
fn complete_valid_block_request(
&mut self,
id: SingleLookupReqId,
@@ -672,51 +609,6 @@ impl TestRig {
)
}
fn complete_valid_sampling_column_requests(
&mut self,
ids: DCByRootIds,
data_columns: Vec<Arc<DataColumnSidecar<E>>>,
) {
for id in ids {
self.log(&format!("return valid data column for {id:?}"));
let indices = &id.1;
let columns_to_send = indices
.iter()
.map(|&i| data_columns[i as usize].clone())
.collect::<Vec<_>>();
self.complete_valid_sampling_column_request(id, &columns_to_send);
}
}
fn complete_valid_sampling_column_request(
&mut self,
id: DCByRootId,
data_columns: &[Arc<DataColumnSidecar<E>>],
) {
let first_dc = data_columns.first().unwrap();
let block_root = first_dc.block_root();
let sampling_request_id = match id.0 {
SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId {
requester: DataColumnsByRootRequester::Sampling(sampling_id),
..
}) => sampling_id.sampling_request_id,
_ => unreachable!(),
};
self.complete_data_columns_by_root_request(id, data_columns);
// Expect work event
self.expect_rpc_sample_verify_work_event();
// Respond with valid result
self.send_sync_message(SyncMessage::SampleVerified {
id: SamplingId {
id: SamplingRequester::ImportedBlock(block_root),
sampling_request_id,
},
result: Ok(()),
})
}
fn complete_valid_custody_request(
&mut self,
ids: DCByRootIds,
@@ -1047,28 +939,7 @@ impl TestRig {
.unwrap_or_else(|e| panic!("Expected RPC custody column work: {e}"))
}
fn expect_rpc_sample_verify_work_event(&mut self) {
self.pop_received_processor_event(|ev| {
if ev.work_type() == beacon_processor::WorkType::RpcVerifyDataColumn {
Some(())
} else {
None
}
})
.unwrap_or_else(|e| panic!("Expected sample verify work: {e}"))
}
fn expect_sampling_result_work(&mut self) {
self.pop_received_processor_event(|ev| {
if ev.work_type() == beacon_processor::WorkType::SamplingResult {
Some(())
} else {
None
}
})
.unwrap_or_else(|e| panic!("Expected sampling result work: {e}"))
}
#[allow(dead_code)]
fn expect_no_work_event(&mut self) {
self.drain_processor_rx();
assert!(self.network_rx_queue.is_empty());
@@ -1280,46 +1151,6 @@ impl TestRig {
imported: false,
});
}
fn assert_sampling_request_ongoing(&self, block_root: Hash256, indices: &[ColumnIndex]) {
for index in indices {
let status = self
.sync_manager
.get_sampling_request_status(block_root, index)
.unwrap_or_else(|| panic!("No request state for {index}"));
if !matches!(status, crate::sync::peer_sampling::Status::Sampling { .. }) {
panic!("expected {block_root} {index} request to be on going: {status:?}");
}
}
}
fn assert_sampling_request_nopeers(&self, block_root: Hash256, indices: &[ColumnIndex]) {
for index in indices {
let status = self
.sync_manager
.get_sampling_request_status(block_root, index)
.unwrap_or_else(|| panic!("No request state for {index}"));
if !matches!(status, crate::sync::peer_sampling::Status::NoPeers) {
panic!("expected {block_root} {index} request to be no peers: {status:?}");
}
}
}
fn log_sampling_requests(&self, block_root: Hash256, indices: &[ColumnIndex]) {
let statuses = indices
.iter()
.map(|index| {
let status = self
.sync_manager
.get_sampling_request_status(block_root, index)
.unwrap_or_else(|| panic!("No request state for {index}"));
(index, status)
})
.collect::<Vec<_>>();
self.log(&format!(
"Sampling request status for {block_root}: {statuses:?}"
));
}
}
#[test]
@@ -2074,137 +1905,6 @@ fn blobs_in_da_checker_skip_download() {
r.expect_no_active_lookups();
}
#[test]
fn sampling_happy_path() {
let Some(mut r) = TestRig::test_setup_after_fulu() else {
return;
};
r.new_connected_peers_for_peerdas();
let (block, data_columns) = r.rand_block_and_data_columns();
let block_root = block.canonical_root();
r.trigger_sample_block(block_root, block.slot());
// Retrieve all outgoing sample requests for random column indexes
let sampling_ids =
r.expect_only_data_columns_by_root_requests(block_root, SAMPLING_REQUIRED_SUCCESSES);
// Resolve all of them one by one
r.complete_valid_sampling_column_requests(sampling_ids, data_columns);
r.expect_clean_finished_sampling();
}
#[test]
fn sampling_with_retries() {
let Some(mut r) = TestRig::test_setup_after_fulu() else {
return;
};
r.new_connected_peers_for_peerdas();
// Add another supernode to ensure that the node can retry.
r.new_connected_supernode_peer();
let (block, data_columns) = r.rand_block_and_data_columns();
let block_root = block.canonical_root();
r.trigger_sample_block(block_root, block.slot());
// Retrieve all outgoing sample requests for random column indexes, and return empty responses
let sampling_ids =
r.expect_only_data_columns_by_root_requests(block_root, SAMPLING_REQUIRED_SUCCESSES);
r.return_empty_sampling_requests(sampling_ids);
// Expect retries for all of them, and resolve them
let sampling_ids =
r.expect_only_data_columns_by_root_requests(block_root, SAMPLING_REQUIRED_SUCCESSES);
r.complete_valid_sampling_column_requests(sampling_ids, data_columns);
r.expect_clean_finished_sampling();
}
#[test]
fn sampling_avoid_retrying_same_peer() {
let Some(mut r) = TestRig::test_setup_after_fulu() else {
return;
};
let peer_id_1 = r.new_connected_supernode_peer();
let peer_id_2 = r.new_connected_supernode_peer();
let block_root = Hash256::random();
r.trigger_sample_block(block_root, Slot::new(0));
// Retrieve all outgoing sample requests for random column indexes, and return empty responses
let sampling_ids =
r.expect_only_data_columns_by_root_requests(block_root, SAMPLING_REQUIRED_SUCCESSES);
r.sampling_requests_failed(sampling_ids, peer_id_1, RPCError::Disconnected);
// Should retry the other peer
let sampling_ids =
r.expect_only_data_columns_by_root_requests(block_root, SAMPLING_REQUIRED_SUCCESSES);
r.sampling_requests_failed(sampling_ids, peer_id_2, RPCError::Disconnected);
// Expect no more retries
r.expect_empty_network();
}
#[test]
fn sampling_batch_requests() {
let Some(mut r) = TestRig::test_setup_after_fulu() else {
return;
};
let _supernode = r.new_connected_supernode_peer();
let (block, data_columns) = r.rand_block_and_data_columns();
let block_root = block.canonical_root();
r.trigger_sample_block(block_root, block.slot());
// Retrieve the sample request, which should be batched.
let (sync_request_id, column_indexes) = r
.expect_only_data_columns_by_root_requests(block_root, 1)
.pop()
.unwrap();
assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES);
r.assert_sampling_request_ongoing(block_root, &column_indexes);
// Resolve the request.
r.complete_valid_sampling_column_requests(
vec![(sync_request_id, column_indexes.clone())],
data_columns,
);
r.expect_clean_finished_sampling();
}
#[test]
fn sampling_batch_requests_not_enough_responses_returned() {
let Some(mut r) = TestRig::test_setup_after_fulu() else {
return;
};
let _supernode = r.new_connected_supernode_peer();
let (block, data_columns) = r.rand_block_and_data_columns();
let block_root = block.canonical_root();
r.trigger_sample_block(block_root, block.slot());
// Retrieve the sample request, which should be batched.
let (sync_request_id, column_indexes) = r
.expect_only_data_columns_by_root_requests(block_root, 1)
.pop()
.unwrap();
assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES);
// The request status should be set to Sampling.
r.assert_sampling_request_ongoing(block_root, &column_indexes);
// Split the indexes to simulate the case where the supernode doesn't have the requested column.
let (column_indexes_supernode_does_not_have, column_indexes_to_complete) =
column_indexes.split_at(1);
// Complete the requests but only partially, so a NotEnoughResponsesReturned error occurs.
let data_columns_to_complete = data_columns
.iter()
.filter(|d| column_indexes_to_complete.contains(&d.index))
.cloned()
.collect::<Vec<_>>();
r.complete_data_columns_by_root_request(
(sync_request_id, column_indexes.clone()),
&data_columns_to_complete,
);
// The request status should be set to NoPeers since the supernode, the only peer, returned not enough responses.
r.log_sampling_requests(block_root, &column_indexes);
r.assert_sampling_request_nopeers(block_root, column_indexes_supernode_does_not_have);
// The sampling request stalls.
r.expect_empty_network();
r.expect_no_work_event();
r.expect_active_sampling(&block_root);
}
#[test]
fn custody_lookup_happy_path() {
let Some(mut r) = TestRig::test_setup_after_fulu() else {
@@ -2233,9 +1933,6 @@ fn custody_lookup_happy_path() {
// - Respond with stream terminator
// ^ The stream terminator should be ignored and not close the next retry
// TODO(das): Test error early a sampling request and it getting drop + then receiving responses
// from pending requests.
mod deneb_only {
use super::*;
use beacon_chain::{