Merge remote-tracking branch 'sigp/peerdas-devnet-7' into peerdas-rangesync

This commit is contained in:
dapplion
2025-05-27 16:20:34 -05:00
54 changed files with 1090 additions and 572 deletions

View File

@@ -1380,9 +1380,11 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peer_id: PeerId,
resp: RpcResponseResult<DataColumnSidecarList<T::EthSpec>>,
) -> Option<CustodyRequestResult<T::EthSpec>> {
let custody_by_range_id = req_id.parent_request_id;
// Note: need to remove the request to borrow self again below. Otherwise we can't
// do nested requests
let Some(mut request) = self.custody_by_range_requests.remove(&id.parent_request_id) else {
let Some(mut request) = self.custody_by_range_requests.remove(&custody_by_range_id) else {
metrics::inc_counter_vec(
&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS,
&["custody_by_range"],
@@ -1395,7 +1397,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.map_err(Into::<RpcResponseError>::into)
.transpose();
self.handle_custody_by_range_result(id.parent_request_id, request, result)
self.handle_custody_by_range_result(custody_by_range_id, request, result)
}
fn handle_custody_by_range_result(

View File

@@ -121,6 +121,8 @@ impl TestRig {
// deterministic seed
let rng = ChaCha20Rng::from_seed([0u8; 32]);
init_tracing();
TestRig {
beacon_processor_rx,
beacon_processor_rx_queue: vec![],
@@ -1154,7 +1156,7 @@ impl TestRig {
pub fn expect_no_penalty_for_anyone(&mut self) {
let downscore_events = self.filter_received_network_events(|ev| match ev {
NetworkMessage::ReportPeer { peer_id, msg, .. } => Some((peer_id, msg)),
NetworkMessage::ReportPeer { peer_id, msg, .. } => Some((*peer_id, *msg)),
_ => None,
});
if !downscore_events.is_empty() {

View File

@@ -8,16 +8,20 @@ use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType};
use beacon_processor::WorkEvent;
use lighthouse_network::service::api_types::ComponentsByRangeRequestId;
use lighthouse_network::NetworkGlobals;
pub use lookups::PeersConfig;
use rand_chacha::ChaCha20Rng;
use slot_clock::ManualSlotClock;
use std::collections::HashMap;
use std::sync::Arc;
use std::fs::OpenOptions;
use std::io::Write;
use std::sync::{Arc, Once};
use store::MemoryStore;
use tokio::sync::mpsc;
use tracing_subscriber::fmt::MakeWriter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use types::{ChainSpec, ForkName, MinimalEthSpec as E, SignedBeaconBlock};
pub use lookups::PeersConfig;
mod lookups;
mod range;
@@ -72,3 +76,55 @@ struct TestRig {
// Cache of sent blocks for PeerDAS responses
sent_blocks_by_range: HashMap<ComponentsByRangeRequestId, Vec<Arc<SignedBeaconBlock<E>>>>,
}
// Environment variable to read if `fork_from_env` feature is enabled.
pub const FORK_NAME_ENV_VAR: &str = "FORK_NAME";
// Environment variable specifying the log output directory in CI.
pub const CI_LOGGER_DIR_ENV_VAR: &str = "CI_LOGGER_DIR";
static INIT_TRACING: Once = Once::new();
pub fn init_tracing() {
INIT_TRACING.call_once(|| {
if std::env::var(CI_LOGGER_DIR_ENV_VAR).is_ok() {
// Enable logging to log files for each test and each fork.
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(CILogWriter),
)
.init();
}
});
}
// CILogWriter writes logs to separate files for each test and each fork.
struct CILogWriter;
impl<'a> MakeWriter<'a> for CILogWriter {
type Writer = Box<dyn Write + Send>;
// fmt::Layer calls this method each time an event is recorded.
fn make_writer(&'a self) -> Self::Writer {
let log_dir = std::env::var(CI_LOGGER_DIR_ENV_VAR).unwrap();
let fork_name = std::env::var(FORK_NAME_ENV_VAR)
.map(|s| format!("{s}_"))
.unwrap_or_default();
// The current test name can be got via the thread name.
let test_name = std::thread::current()
.name()
.unwrap_or("unnamed")
.replace(|c: char| !c.is_alphanumeric(), "_");
let file_path = format!("{log_dir}/{fork_name}{test_name}.log");
let file = OpenOptions::new()
.append(true)
.create(true)
.open(&file_path)
.expect("failed to open a log file");
Box::new(file)
}
}