Fix finalized_sync_permanent_custody_peer_failure

This commit is contained in:
dapplion
2025-06-04 22:20:45 -06:00
parent 2b4a9bda44
commit ae0ef8f929
7 changed files with 66 additions and 62 deletions

View File

@@ -50,6 +50,7 @@ use crate::sync::block_lookups::{
BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult,
};
use crate::sync::network_context::PeerGroup;
use crate::sync::range_sync::BATCH_BUFFER_SIZE;
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::validator_monitor::timestamp_now;
use beacon_chain::{
@@ -280,6 +281,7 @@ pub fn spawn<T: BeaconChainTypes>(
sync_recv,
SamplingConfig::Default,
fork_context,
BATCH_BUFFER_SIZE,
);
// spawn the sync manager thread
@@ -302,6 +304,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
sync_recv: mpsc::UnboundedReceiver<SyncMessage<T::EthSpec>>,
sampling_config: SamplingConfig,
fork_context: Arc<ForkContext>,
batch_buffer_size: usize,
) -> Self {
let network_globals = beacon_processor.network_globals.clone();
Self {
@@ -313,7 +316,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
beacon_chain.clone(),
fork_context.clone(),
),
range_sync: RangeSync::new(beacon_chain.clone()),
range_sync: RangeSync::new(beacon_chain.clone(), batch_buffer_size),
backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals),
block_lookups: BlockLookups::new(),
notified_unknown_roots: LRUTimeCache::new(Duration::from_secs(

View File

@@ -26,7 +26,7 @@ use types::{Epoch, EthSpec, Hash256, Slot};
pub const EPOCHS_PER_BATCH: u64 = 1;
/// The maximum number of batches to queue before requesting more.
const BATCH_BUFFER_SIZE: u8 = 5;
pub const BATCH_BUFFER_SIZE: usize = 5;
/// A return type for functions that act on a `Chain` which informs the caller whether the chain
/// has been completed and should be removed or to be kept if further processing is
@@ -119,6 +119,9 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// The current processing batch, if any.
current_processing_batch: Option<BatchId>,
/// The maximum number of batches to queue before requesting more.
batch_buffer_size: usize,
}
#[derive(PartialEq, Debug)]
@@ -147,6 +150,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
target_head_root: Hash256,
peer_id: PeerId,
chain_type: SyncingChainType,
batch_buffer_size: usize,
) -> Self {
SyncingChain {
id,
@@ -163,6 +167,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
attempted_optimistic_starts: HashSet::default(),
state: ChainSyncingState::Stopped,
current_processing_batch: None,
batch_buffer_size,
}
}
@@ -1075,7 +1080,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.iter()
.filter(|&(_epoch, batch)| in_buffer(batch))
.count()
> BATCH_BUFFER_SIZE as usize
>= self.batch_buffer_size as usize
{
return None;
}
@@ -1105,28 +1110,28 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// batch states. See [BatchState::visualize] for symbol definitions.
#[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
fn visualize_batch_state(&self) -> String {
let mut visualization_string = String::with_capacity((BATCH_BUFFER_SIZE * 3) as usize);
let mut visualization_string = String::with_capacity((self.batch_buffer_size * 3) as usize);
// Start of the block
visualization_string.push('[');
for mut batch_index in 0..BATCH_BUFFER_SIZE {
for mut batch_index in 0..self.batch_buffer_size {
if let Some(batch) = self
.batches
.get(&(self.processing_target + batch_index as u64 * EPOCHS_PER_BATCH))
{
visualization_string.push(batch.visualize());
if batch_index != BATCH_BUFFER_SIZE {
if batch_index != self.batch_buffer_size {
// Add a comma in between elements
visualization_string.push(',');
}
} else {
// No batch exists, it is on our list to be downloaded
// Fill in the rest of the gaps
while batch_index < BATCH_BUFFER_SIZE {
while batch_index < self.batch_buffer_size {
visualization_string.push('E');
// Add a comma between the empty batches
if batch_index < BATCH_BUFFER_SIZE.saturating_sub(1) {
if batch_index < self.batch_buffer_size.saturating_sub(1) {
visualization_string.push(',')
}
batch_index += 1;

View File

@@ -51,6 +51,8 @@ pub struct ChainCollection<T: BeaconChainTypes> {
head_chains: FnvHashMap<ChainId, SyncingChain<T>>,
/// The current sync state of the process.
state: RangeSyncState,
/// The maximum number of batches to queue before requesting more.
batch_buffer_size: usize,
}
impl<T: BeaconChainTypes> ChainCollection<T> {
@@ -61,12 +63,13 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
.chain(self.head_chains.values())
}
pub fn new(beacon_chain: Arc<BeaconChain<T>>) -> Self {
pub fn new(beacon_chain: Arc<BeaconChain<T>>, batch_buffer_size: usize) -> Self {
ChainCollection {
beacon_chain,
finalized_chains: FnvHashMap::default(),
head_chains: FnvHashMap::default(),
state: RangeSyncState::Idle,
batch_buffer_size,
}
}
@@ -504,6 +507,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
target_head_root,
peer,
sync_type.into(),
self.batch_buffer_size,
);
debug!(

View File

@@ -10,6 +10,6 @@ mod sync_type;
pub use batch::{
BatchConfig, BatchInfo, BatchOperationOutcome, BatchPeers, BatchProcessingResult, BatchState,
};
pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH};
pub use chain::{BatchId, ChainId, BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH};
pub use range::RangeSync;
pub use sync_type::RangeSyncType;

View File

@@ -90,10 +90,10 @@ where
name = "range_sync",
skip_all
)]
pub fn new(beacon_chain: Arc<BeaconChain<T>>) -> Self {
pub fn new(beacon_chain: Arc<BeaconChain<T>>, batch_buffer_size: usize) -> Self {
RangeSync {
beacon_chain: beacon_chain.clone(),
chains: ChainCollection::new(beacon_chain),
chains: ChainCollection::new(beacon_chain, batch_buffer_size),
failed_chains: LRUTimeCache::new(std::time::Duration::from_secs(
FAILED_CHAINS_EXPIRY_SECONDS,
)),

View File

@@ -2,6 +2,7 @@ use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::sync::block_lookups::{
BlockLookupSummary, PARENT_DEPTH_TOLERANCE, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
};
use crate::sync::range_sync::BATCH_BUFFER_SIZE;
use crate::sync::{
manager::{BlockProcessType, BlockProcessingResult, SyncManager},
peer_sampling::SamplingConfig,
@@ -59,16 +60,29 @@ pub enum PeersConfig {
SupernodeOnly,
}
pub struct TestOptions {
/// If the node created by this test harness is a supernode
pub is_supernode: bool,
/// The maximum number of batches to queue before requesting more.
pub batch_buffer_size: usize,
}
impl TestRig {
pub fn test_setup() -> Self {
Self::test_setup_with_options(false)
Self::test_setup_with_options(TestOptions {
is_supernode: false,
batch_buffer_size: BATCH_BUFFER_SIZE,
})
}
pub fn test_setup_as_supernode() -> Self {
Self::test_setup_with_options(true)
Self::test_setup_with_options(TestOptions {
is_supernode: true,
batch_buffer_size: BATCH_BUFFER_SIZE,
})
}
fn test_setup_with_options(is_supernode: bool) -> Self {
pub fn test_setup_with_options(options: TestOptions) -> Self {
// Use `fork_from_env` logic to set correct fork epochs
let spec = test_spec::<E>();
@@ -101,7 +115,7 @@ impl TestRig {
Vec::new(),
network_config,
chain.spec.clone(),
is_supernode,
options.is_supernode,
));
let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing(
globals,
@@ -143,6 +157,7 @@ impl TestRig {
required_successes: vec![SAMPLING_REQUIRED_SUCCESSES],
},
fork_context,
options.batch_buffer_size,
),
harness,
fork_name,

View File

@@ -4,12 +4,12 @@ use crate::status::ToStatusMessage;
use crate::sync::manager::SLOT_IMPORT_TOLERANCE;
use crate::sync::network_context::{BlockComponentsByRangeRequestStep, RangeRequestId};
use crate::sync::range_sync::{BatchId, BatchState, RangeSyncType};
use crate::sync::tests::lookups::TestOptions;
use crate::sync::{ChainId, SyncMessage};
use beacon_chain::data_column_verification::CustodyDataColumn;
use beacon_chain::test_utils::{test_spec, AttestationStrategy, BlockStrategy};
use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy};
use beacon_chain::{block_verification_types::RpcBlock, EngineState, NotifyExecutionLayer};
use beacon_processor::WorkType;
use lighthouse_network::discovery::{peer_id_to_node_id, CombinedKey};
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, DataColumnsByRangeRequest, OldBlocksByRangeRequest,
};
@@ -19,16 +19,13 @@ use lighthouse_network::service::api_types::{
DataColumnsByRangeRequestId, SyncRequestId,
};
use lighthouse_network::types::SyncState;
use lighthouse_network::{Enr, EnrExt, PeerId, SyncInfo};
use rand::SeedableRng;
use rand_chacha::ChaCha20Rng;
use lighthouse_network::{PeerId, SyncInfo};
use std::collections::HashSet;
use std::time::Duration;
use types::data_column_custody_group::compute_subnets_for_node;
use types::{
BeaconBlock, BlobSidecarList, BlockImportSource, ColumnIndex, DataColumnSidecar,
DataColumnSubnetId, Epoch, EthSpec, Hash256, KzgCommitment, MinimalEthSpec as E, Signature,
SignedBeaconBlock, SignedBeaconBlockHash, Slot, VariableList,
BeaconBlock, BlobSidecarList, BlockImportSource, ColumnIndex, DataColumnSidecar, Epoch,
EthSpec, Hash256, KzgCommitment, MinimalEthSpec as E, Signature, SignedBeaconBlock,
SignedBeaconBlockHash, Slot, VariableList,
};
const D: Duration = Duration::new(0, 0);
@@ -93,6 +90,12 @@ struct RequestFilter {
column_index: Option<u64>,
}
const NO_FILTER: RequestFilter = RequestFilter {
peer: None,
epoch: None,
column_index: None,
};
impl RequestFilter {
fn peer(mut self, peer: PeerId) -> Self {
self.peer = Some(peer);
@@ -1094,7 +1097,7 @@ fn finalized_sync_not_enough_custody_peers_on_start(config: Config) {
// The SyncingChain has a single peer, so it can issue blocks_by_range requests. However, it
// doesn't have enough peers to cover all columns
r.progress_until_no_events(filter(), complete());
r.progress_until_no_events(NO_FILTER, complete());
r.expect_no_active_rpc_requests();
// Here we have a batch with partially completed block_components_by_range requests. The batch
@@ -1108,7 +1111,7 @@ fn finalized_sync_not_enough_custody_peers_on_start(config: Config) {
// We still need to add enough peers to trigger batch downloads with idle peers. Same issue as
// the test above.
r.progress_until_no_events(filter(), complete());
r.progress_until_no_events(NO_FILTER, complete());
r.expect_no_active_rpc_requests();
r.expect_no_active_block_components_by_range_requests();
// TOOD(das): For now this tests don't complete sync. We can't track beacon processor Work
@@ -1134,7 +1137,7 @@ fn finalized_sync_single_custody_peer_failure() {
// Progress all blocks_by_range and columns_by_range requests but respond empty for a single
// column index
r.progress_until_no_events(
filter(),
NO_FILTER,
complete().custody_failure_at_index(column_index_to_fail),
);
r.expect_penalties("custody_failure");
@@ -1162,7 +1165,13 @@ fn finalized_sync_single_custody_peer_failure() {
#[test]
fn finalized_sync_permanent_custody_peer_failure() {
let mut r = TestRig::test_setup();
let mut r = TestRig::test_setup_with_options(TestOptions {
is_supernode: false,
// The default buffer size is 5, but we want to manually complete only the batch for epoch
// 0. By setting this buffer to 1 sync will create a single batch until it completes. We can
// do better assertions of state assuming there's only one batch and logs are cleaner.
batch_buffer_size: 1,
});
// Only run post-PeerDAS
if !r.fork_name.fulu_enabled() {
return;
@@ -1192,7 +1201,7 @@ fn finalized_sync_permanent_custody_peer_failure() {
// Some peer had a costudy failure at `column_index` so sync should do a single extra request
// for that index and epoch. We want to make sure that the request goes to different peer
// than the attempts before.
// than the attempted before.
let reqs =
r.find_data_by_range_request(filter().epoch(0).column_index(column_index_to_fail));
let req_peer = reqs.peer();
@@ -1212,35 +1221,3 @@ fn finalized_sync_permanent_custody_peer_failure() {
// custody_by_range request is still active waiting for a new peer to connect
r.expect_active_block_components_by_range_request_on_custody_step();
}
#[test]
#[ignore]
fn mine_peerids() {
let spec = test_spec::<E>();
let mut rng = ChaCha20Rng::from_seed([0u8; 32]);
let expected_subnets = (0..3)
.map(|i| DataColumnSubnetId::new(i as u64))
.collect::<HashSet<_>>();
for i in 0..usize::MAX {
let key: CombinedKey = k256::ecdsa::SigningKey::random(&mut rng).into();
let enr = Enr::builder().build(&key).unwrap();
let peer_id = enr.peer_id();
// Use default custody groups count
let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id");
let subnets = compute_subnets_for_node(node_id.raw(), spec.custody_requirement, &spec)
.expect("should compute custody subnets");
if expected_subnets == subnets {
panic!("{:?}", subnets);
} else {
let matches = expected_subnets
.iter()
.filter(|index| subnets.contains(index))
.count();
if matches > 0 {
println!("{i} {:?}", matches);
}
}
}
}