mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-04 01:01:41 +00:00
Compare commits
10 Commits
progressiv
...
release-v8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
95f12d0927 | ||
|
|
8cf6ffac4b | ||
|
|
4588971085 | ||
|
|
561898fc1c | ||
|
|
be799cb2ad | ||
|
|
c5b4580e37 | ||
|
|
691c8cf8e6 | ||
|
|
c61665b3a1 | ||
|
|
d4ec006a34 | ||
|
|
9065e4a56e |
39
Cargo.lock
generated
39
Cargo.lock
generated
@@ -4,7 +4,7 @@ version = 4
|
||||
|
||||
[[package]]
|
||||
name = "account_manager"
|
||||
version = "8.1.0"
|
||||
version = "8.1.1"
|
||||
dependencies = [
|
||||
"account_utils",
|
||||
"bls",
|
||||
@@ -1276,7 +1276,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "beacon_node"
|
||||
version = "8.1.0"
|
||||
version = "8.1.1"
|
||||
dependencies = [
|
||||
"account_utils",
|
||||
"beacon_chain",
|
||||
@@ -1513,7 +1513,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "boot_node"
|
||||
version = "8.1.0"
|
||||
version = "8.1.1"
|
||||
dependencies = [
|
||||
"beacon_node",
|
||||
"bytes",
|
||||
@@ -4832,9 +4832,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "keccak"
|
||||
version = "0.1.5"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654"
|
||||
checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653"
|
||||
dependencies = [
|
||||
"cpufeatures",
|
||||
]
|
||||
@@ -4897,7 +4897,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "lcli"
|
||||
version = "8.1.0"
|
||||
version = "8.1.1"
|
||||
dependencies = [
|
||||
"account_utils",
|
||||
"beacon_chain",
|
||||
@@ -5383,7 +5383,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lighthouse"
|
||||
version = "8.1.0"
|
||||
version = "8.1.1"
|
||||
dependencies = [
|
||||
"account_manager",
|
||||
"account_utils",
|
||||
@@ -5515,7 +5515,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lighthouse_version"
|
||||
version = "8.1.0"
|
||||
version = "8.1.1"
|
||||
dependencies = [
|
||||
"regex",
|
||||
]
|
||||
@@ -6323,9 +6323,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "num-conv"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
|
||||
checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050"
|
||||
|
||||
[[package]]
|
||||
name = "num-integer"
|
||||
@@ -8899,30 +8899,30 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "time"
|
||||
version = "0.3.44"
|
||||
version = "0.3.47"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d"
|
||||
checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c"
|
||||
dependencies = [
|
||||
"deranged",
|
||||
"itoa",
|
||||
"num-conv",
|
||||
"powerfmt",
|
||||
"serde",
|
||||
"serde_core",
|
||||
"time-core",
|
||||
"time-macros",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "time-core"
|
||||
version = "0.1.6"
|
||||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b"
|
||||
checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca"
|
||||
|
||||
[[package]]
|
||||
name = "time-macros"
|
||||
version = "0.2.24"
|
||||
version = "0.2.27"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3"
|
||||
checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215"
|
||||
dependencies = [
|
||||
"num-conv",
|
||||
"time-core",
|
||||
@@ -9622,7 +9622,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "validator_client"
|
||||
version = "8.1.0"
|
||||
version = "8.1.1"
|
||||
dependencies = [
|
||||
"account_utils",
|
||||
"beacon_node_fallback",
|
||||
@@ -10607,8 +10607,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "yamux"
|
||||
version = "0.13.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "deab71f2e20691b4728b349c6cee8fc7223880fa67b6b4f92225ec32225447e5"
|
||||
source = "git+https://github.com/sigp/rust-yamux?rev=575b17c0f44f4253079a6bafaa2de74ca1d6dfaa#575b17c0f44f4253079a6bafaa2de74ca1d6dfaa"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"log",
|
||||
|
||||
@@ -91,7 +91,7 @@ resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
edition = "2024"
|
||||
version = "8.1.0"
|
||||
version = "8.1.1"
|
||||
|
||||
[workspace.dependencies]
|
||||
account_utils = { path = "common/account_utils" }
|
||||
@@ -303,3 +303,4 @@ debug = true
|
||||
|
||||
[patch.crates-io]
|
||||
quick-protobuf = { git = "https://github.com/sigp/quick-protobuf.git", rev = "681f413312404ab6e51f0b46f39b0075c6f4ebfd" }
|
||||
yamux = { git = "https://github.com/sigp/rust-yamux", rev = "575b17c0f44f4253079a6bafaa2de74ca1d6dfaa" }
|
||||
|
||||
@@ -918,6 +918,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.start_slot(T::EthSpec::slots_per_epoch()),
|
||||
);
|
||||
|
||||
self.observed_column_sidecars.write().prune(
|
||||
new_view
|
||||
.finalized_checkpoint
|
||||
.epoch
|
||||
.start_slot(T::EthSpec::slots_per_epoch()),
|
||||
);
|
||||
|
||||
self.observed_slashable.write().prune(
|
||||
new_view
|
||||
.finalized_checkpoint
|
||||
|
||||
@@ -457,6 +457,9 @@ fn handle_error<T>(
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
// All snappy errors from the snap crate bubble up as `Other` kind errors
|
||||
// that imply invalid response
|
||||
ErrorKind::Other => Err(RPCError::InvalidData(err.to_string())),
|
||||
_ => Err(RPCError::from(err)),
|
||||
}
|
||||
}
|
||||
@@ -2317,4 +2320,43 @@ mod tests {
|
||||
RPCError::InvalidData(_)
|
||||
));
|
||||
}
|
||||
|
||||
/// Test invalid snappy response.
|
||||
#[test]
|
||||
fn test_invalid_snappy_response() {
|
||||
let spec = spec_with_all_forks_enabled();
|
||||
let fork_ctx = Arc::new(fork_context(ForkName::latest(), &spec));
|
||||
let max_packet_size = spec.max_payload_size as usize; // 10 MiB.
|
||||
|
||||
let protocol = ProtocolId::new(SupportedProtocol::BlocksByRangeV2, Encoding::SSZSnappy);
|
||||
|
||||
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
|
||||
protocol.clone(),
|
||||
max_packet_size,
|
||||
fork_ctx.clone(),
|
||||
);
|
||||
|
||||
let mut payload = BytesMut::new();
|
||||
payload.extend_from_slice(&[0u8]);
|
||||
let deneb_epoch = spec.deneb_fork_epoch.unwrap();
|
||||
payload.extend_from_slice(&fork_ctx.context_bytes(deneb_epoch));
|
||||
|
||||
// Claim the MAXIMUM allowed size (10 MiB)
|
||||
let claimed_size = max_packet_size;
|
||||
let mut uvi_codec: Uvi<usize> = Uvi::default();
|
||||
uvi_codec.encode(claimed_size, &mut payload).unwrap();
|
||||
payload.extend_from_slice(&[0xBB; 16]); // Junk snappy.
|
||||
|
||||
let result = codec.decode(&mut payload);
|
||||
|
||||
assert!(result.is_err(), "Expected decode to fail");
|
||||
|
||||
// IoError = reached snappy decode (allocation happened).
|
||||
let err = result.unwrap_err();
|
||||
assert!(
|
||||
matches!(err, RPCError::InvalidData(_)),
|
||||
"Should return invalid data variant {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,8 @@ use futures::prelude::*;
|
||||
use libp2p::PeerId;
|
||||
use libp2p::swarm::handler::{
|
||||
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
|
||||
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
|
||||
FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, StreamUpgradeError,
|
||||
SubstreamProtocol,
|
||||
};
|
||||
use libp2p::swarm::{ConnectionId, Stream};
|
||||
use logging::crit;
|
||||
@@ -888,6 +889,16 @@ where
|
||||
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => {
|
||||
self.on_dial_upgrade_error(info, error)
|
||||
}
|
||||
ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
|
||||
error: (proto, error),
|
||||
..
|
||||
}) => {
|
||||
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
|
||||
id: self.current_inbound_substream_id,
|
||||
proto,
|
||||
error,
|
||||
}));
|
||||
}
|
||||
_ => {
|
||||
// NOTE: ConnectionEvent is a non exhaustive enum so updates should be based on
|
||||
// release notes more than compiler feedback
|
||||
@@ -924,7 +935,7 @@ where
|
||||
request.count()
|
||||
)),
|
||||
}));
|
||||
return self.shutdown(None);
|
||||
return;
|
||||
}
|
||||
}
|
||||
RequestType::BlobsByRange(request) => {
|
||||
@@ -940,7 +951,7 @@ where
|
||||
max_allowed, max_requested_blobs
|
||||
)),
|
||||
}));
|
||||
return self.shutdown(None);
|
||||
return;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
|
||||
@@ -675,7 +675,7 @@ where
|
||||
E: EthSpec,
|
||||
{
|
||||
type Output = InboundOutput<TSocket, E>;
|
||||
type Error = RPCError;
|
||||
type Error = (Protocol, RPCError);
|
||||
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
|
||||
|
||||
fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future {
|
||||
@@ -717,10 +717,12 @@ where
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(e) => Err(RPCError::from(e)),
|
||||
Err(e) => Err((versioned_protocol.protocol(), RPCError::from(e))),
|
||||
Ok((Some(Ok(request)), stream)) => Ok((request, stream)),
|
||||
Ok((Some(Err(e)), _)) => Err(e),
|
||||
Ok((None, _)) => Err(RPCError::IncompleteStream),
|
||||
Ok((Some(Err(e)), _)) => Err((versioned_protocol.protocol(), e)),
|
||||
Ok((None, _)) => {
|
||||
Err((versioned_protocol.protocol(), RPCError::IncompleteStream))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,8 +5,12 @@ use crate::common::spec_with_all_forks_enabled;
|
||||
use crate::common::{Protocol, build_tracing_subscriber};
|
||||
use bls::Signature;
|
||||
use fixed_bytes::FixedBytesExtended;
|
||||
use libp2p::PeerId;
|
||||
use lighthouse_network::rpc::{RequestType, methods::*};
|
||||
use lighthouse_network::service::api_types::AppRequestId;
|
||||
use lighthouse_network::service::api_types::{
|
||||
AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
|
||||
DataColumnsByRangeRequestId, DataColumnsByRangeRequester, RangeRequestId, SyncRequestId,
|
||||
};
|
||||
use lighthouse_network::{NetworkEvent, ReportSource, Response};
|
||||
use ssz::Encode;
|
||||
use ssz_types::{RuntimeVariableList, VariableList};
|
||||
@@ -1785,3 +1789,157 @@ fn test_active_requests() {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Test that when a node receives an invalid BlocksByRange request exceeding the maximum count,
|
||||
// it bans the sender.
|
||||
#[test]
|
||||
fn test_request_too_large_blocks_by_range() {
|
||||
let spec = Arc::new(spec_with_all_forks_enabled());
|
||||
|
||||
test_request_too_large(
|
||||
AppRequestId::Sync(SyncRequestId::BlocksByRange(BlocksByRangeRequestId {
|
||||
id: 1,
|
||||
parent_request_id: ComponentsByRangeRequestId {
|
||||
id: 1,
|
||||
requester: RangeRequestId::RangeSync {
|
||||
chain_id: 1,
|
||||
batch_id: Epoch::new(1),
|
||||
},
|
||||
},
|
||||
})),
|
||||
RequestType::BlocksByRange(OldBlocksByRangeRequest::new(
|
||||
0,
|
||||
spec.max_request_blocks(ForkName::Base) as u64 + 1, // exceeds the max request defined in the spec.
|
||||
1,
|
||||
)),
|
||||
);
|
||||
}
|
||||
|
||||
// Test that when a node receives an invalid BlobsByRange request exceeding the maximum count,
|
||||
// it bans the sender.
|
||||
#[test]
|
||||
fn test_request_too_large_blobs_by_range() {
|
||||
let spec = Arc::new(spec_with_all_forks_enabled());
|
||||
|
||||
let max_request_blobs_count = spec.max_request_blob_sidecars(ForkName::Base) as u64
|
||||
/ spec.max_blobs_per_block_within_fork(ForkName::Base);
|
||||
test_request_too_large(
|
||||
AppRequestId::Sync(SyncRequestId::BlobsByRange(BlobsByRangeRequestId {
|
||||
id: 1,
|
||||
parent_request_id: ComponentsByRangeRequestId {
|
||||
id: 1,
|
||||
requester: RangeRequestId::RangeSync {
|
||||
chain_id: 1,
|
||||
batch_id: Epoch::new(1),
|
||||
},
|
||||
},
|
||||
})),
|
||||
RequestType::BlobsByRange(BlobsByRangeRequest {
|
||||
start_slot: 0,
|
||||
count: max_request_blobs_count + 1, // exceeds the max request defined in the spec.
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
// Test that when a node receives an invalid DataColumnsByRange request exceeding the columns count,
|
||||
// it bans the sender.
|
||||
#[test]
|
||||
fn test_request_too_large_data_columns_by_range() {
|
||||
test_request_too_large(
|
||||
AppRequestId::Sync(SyncRequestId::DataColumnsByRange(
|
||||
DataColumnsByRangeRequestId {
|
||||
id: 1,
|
||||
parent_request_id: DataColumnsByRangeRequester::ComponentsByRange(
|
||||
ComponentsByRangeRequestId {
|
||||
id: 1,
|
||||
requester: RangeRequestId::RangeSync {
|
||||
chain_id: 1,
|
||||
batch_id: Epoch::new(1),
|
||||
},
|
||||
},
|
||||
),
|
||||
peer: PeerId::random(),
|
||||
},
|
||||
)),
|
||||
RequestType::DataColumnsByRange(DataColumnsByRangeRequest {
|
||||
start_slot: 0,
|
||||
count: 0,
|
||||
// exceeds the max request defined in the spec.
|
||||
columns: vec![0; E::number_of_columns() + 1],
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
fn test_request_too_large(app_request_id: AppRequestId, request: RequestType<E>) {
|
||||
// Set up the logging.
|
||||
let log_level = "debug";
|
||||
let enable_logging = true;
|
||||
let _subscriber = build_tracing_subscriber(log_level, enable_logging);
|
||||
let rt = Arc::new(Runtime::new().unwrap());
|
||||
let spec = Arc::new(spec_with_all_forks_enabled());
|
||||
|
||||
rt.block_on(async {
|
||||
let (mut sender, mut receiver) = common::build_node_pair(
|
||||
Arc::downgrade(&rt),
|
||||
ForkName::Base,
|
||||
spec,
|
||||
Protocol::Tcp,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Build the sender future
|
||||
let sender_future = async {
|
||||
loop {
|
||||
match sender.next_event().await {
|
||||
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
||||
debug!(?request, %peer_id, "Sending RPC request");
|
||||
sender
|
||||
.send_request(peer_id, app_request_id, request.clone())
|
||||
.unwrap();
|
||||
}
|
||||
NetworkEvent::ResponseReceived {
|
||||
app_request_id,
|
||||
response,
|
||||
..
|
||||
} => {
|
||||
debug!(?app_request_id, ?response, "Received response");
|
||||
}
|
||||
NetworkEvent::RPCFailed { error, .. } => {
|
||||
// This variant should be unreachable, as the receiver doesn't respond with an error when a request exceeds the limit.
|
||||
debug!(?error, "RPC failed");
|
||||
unreachable!();
|
||||
}
|
||||
NetworkEvent::PeerDisconnected(peer_id) => {
|
||||
// The receiver should disconnect as a result of the invalid request.
|
||||
debug!(%peer_id, "Peer disconnected");
|
||||
// End the test.
|
||||
return;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("Sender"));
|
||||
|
||||
// Build the receiver future
|
||||
let receiver_future = async {
|
||||
loop {
|
||||
if let NetworkEvent::RequestReceived { .. } = receiver.next_event().await {
|
||||
// This event should be unreachable, as the handler drops the invalid request.
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("Receiver"));
|
||||
|
||||
tokio::select! {
|
||||
_ = sender_future => {}
|
||||
_ = receiver_future => {}
|
||||
_ = sleep(Duration::from_secs(30)) => {
|
||||
panic!("Future timed out");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -462,6 +462,13 @@ pub static SYNCING_CHAIN_BATCH_AWAITING_PROCESSING: LazyLock<Result<Histogram>>
|
||||
]),
|
||||
)
|
||||
});
|
||||
pub static SYNCING_CHAIN_BATCHES: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
|
||||
try_create_int_gauge_vec(
|
||||
"sync_batches",
|
||||
"Number of batches in sync chains by sync type and state",
|
||||
&["sync_type", "state"],
|
||||
)
|
||||
});
|
||||
pub static SYNC_SINGLE_BLOCK_LOOKUPS: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
||||
try_create_int_gauge(
|
||||
"sync_single_block_lookups",
|
||||
|
||||
@@ -977,7 +977,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
};
|
||||
|
||||
// remove all skip slots i.e. duplicated roots
|
||||
Ok(block_roots.into_iter().unique().collect::<Vec<_>>())
|
||||
Ok(block_roots
|
||||
.into_iter()
|
||||
.unique_by(|(root, _)| *root)
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
/// Handle a `BlobsByRange` request from the peer.
|
||||
|
||||
@@ -120,6 +120,39 @@ impl TestRig {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn new_with_skip_slots(chain_length: u64, skip_slots: &HashSet<u64>) -> Self {
|
||||
let mut spec = test_spec::<E>();
|
||||
spec.shard_committee_period = 2;
|
||||
let spec = Arc::new(spec);
|
||||
let beacon_processor_config = BeaconProcessorConfig::default();
|
||||
let harness = BeaconChainHarness::builder(MainnetEthSpec)
|
||||
.spec(spec.clone())
|
||||
.deterministic_keypairs(VALIDATOR_COUNT)
|
||||
.fresh_ephemeral_store()
|
||||
.mock_execution_layer()
|
||||
.node_custody_type(NodeCustodyType::Fullnode)
|
||||
.chain_config(<_>::default())
|
||||
.build();
|
||||
|
||||
harness.advance_slot();
|
||||
|
||||
for slot in 1..=chain_length {
|
||||
if !skip_slots.contains(&slot) {
|
||||
harness
|
||||
.extend_chain(
|
||||
1,
|
||||
BlockStrategy::OnCanonicalHead,
|
||||
AttestationStrategy::AllValidators,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
harness.advance_slot();
|
||||
}
|
||||
|
||||
Self::from_harness(harness, beacon_processor_config, spec).await
|
||||
}
|
||||
|
||||
pub async fn new_parametric(
|
||||
chain_length: u64,
|
||||
beacon_processor_config: BeaconProcessorConfig,
|
||||
@@ -150,6 +183,14 @@ impl TestRig {
|
||||
harness.advance_slot();
|
||||
}
|
||||
|
||||
Self::from_harness(harness, beacon_processor_config, spec).await
|
||||
}
|
||||
|
||||
async fn from_harness(
|
||||
harness: BeaconChainHarness<T>,
|
||||
beacon_processor_config: BeaconProcessorConfig,
|
||||
spec: Arc<ChainSpec>,
|
||||
) -> Self {
|
||||
let head = harness.chain.head_snapshot();
|
||||
|
||||
assert_eq!(
|
||||
@@ -1986,3 +2027,78 @@ async fn test_data_columns_by_range_request_only_returns_requested_columns() {
|
||||
"Should have received at least some data columns"
|
||||
);
|
||||
}
|
||||
|
||||
/// Test that DataColumnsByRange does not return duplicate data columns for skip slots.
|
||||
///
|
||||
/// When skip slots occur, `forwards_iter_block_roots` returns the same block root for
|
||||
/// consecutive slots. The deduplication in `get_block_roots_from_store` must use
|
||||
/// `unique_by` on the root (not the full `(root, slot)` tuple) to avoid serving
|
||||
/// duplicate data columns for the same block.
|
||||
#[tokio::test]
|
||||
async fn test_data_columns_by_range_no_duplicates_with_skip_slots() {
|
||||
if test_spec::<E>().fulu_fork_epoch.is_none() {
|
||||
return;
|
||||
};
|
||||
|
||||
// Build a chain of 128 slots (4 epochs) with skip slots at positions 5 and 6.
|
||||
// After 4 epochs, finalized_epoch=2 (finalized_slot=64). Requesting slots 0-9
|
||||
// satisfies req_start_slot + req_count <= finalized_slot (10 <= 64), which routes
|
||||
// through `get_block_roots_from_store` — the code path with the bug.
|
||||
let skip_slots: HashSet<u64> = [5, 6].into_iter().collect();
|
||||
let mut rig = TestRig::new_with_skip_slots(128, &skip_slots).await;
|
||||
|
||||
let all_custody_columns = rig.chain.custody_columns_for_epoch(Some(Epoch::new(0)));
|
||||
let requested_column = vec![all_custody_columns[0]];
|
||||
|
||||
// Request a range that spans the skip slots (slots 0 through 9).
|
||||
let start_slot = 0;
|
||||
let slot_count = 10;
|
||||
|
||||
rig.network_beacon_processor
|
||||
.send_data_columns_by_range_request(
|
||||
PeerId::random(),
|
||||
InboundRequestId::new_unchecked(42, 24),
|
||||
DataColumnsByRangeRequest {
|
||||
start_slot,
|
||||
count: slot_count,
|
||||
columns: requested_column.clone(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Collect block roots from all data column responses.
|
||||
let mut block_roots: Vec<Hash256> = Vec::new();
|
||||
|
||||
while let Some(next) = rig.network_rx.recv().await {
|
||||
if let NetworkMessage::SendResponse {
|
||||
peer_id: _,
|
||||
response: Response::DataColumnsByRange(data_column),
|
||||
inbound_request_id: _,
|
||||
} = next
|
||||
{
|
||||
if let Some(column) = data_column {
|
||||
block_roots.push(column.block_root());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
panic!("unexpected message {:?}", next);
|
||||
}
|
||||
}
|
||||
|
||||
assert!(
|
||||
!block_roots.is_empty(),
|
||||
"Should have received at least some data columns"
|
||||
);
|
||||
|
||||
// Before the fix, skip slots caused the same block root to appear multiple times
|
||||
// (once per skip slot) because .unique() on (Hash256, Slot) tuples didn't deduplicate.
|
||||
let unique_roots: HashSet<_> = block_roots.iter().collect();
|
||||
assert_eq!(
|
||||
block_roots.len(),
|
||||
unique_roots.len(),
|
||||
"Response contained duplicate block roots: got {} columns but only {} unique roots",
|
||||
block_roots.len(),
|
||||
unique_roots.len(),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -8,9 +8,11 @@
|
||||
//! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill
|
||||
//! sync as failed, log an error and attempt to retry once a new peer joins the node.
|
||||
|
||||
use crate::metrics;
|
||||
use crate::network_beacon_processor::ChainSegmentProcessId;
|
||||
use crate::sync::batch::{
|
||||
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
|
||||
BatchConfig, BatchId, BatchInfo, BatchMetricsState, BatchOperationOutcome,
|
||||
BatchProcessingResult, BatchState,
|
||||
};
|
||||
use crate::sync::block_sidecar_coupling::CouplingError;
|
||||
use crate::sync::manager::BatchProcessResult;
|
||||
@@ -31,6 +33,7 @@ use std::collections::{
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use strum::IntoEnumIterator;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use types::{ColumnIndex, Epoch, EthSpec};
|
||||
|
||||
@@ -1181,6 +1184,21 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
.epoch(T::EthSpec::slots_per_epoch())
|
||||
}
|
||||
|
||||
pub fn register_metrics(&self) {
|
||||
for state in BatchMetricsState::iter() {
|
||||
let count = self
|
||||
.batches
|
||||
.values()
|
||||
.filter(|b| b.state().metrics_state() == state)
|
||||
.count();
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::SYNCING_CHAIN_BATCHES,
|
||||
&["backfill", state.into()],
|
||||
count as i64,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Updates the global network state indicating the current state of a backfill sync.
|
||||
fn set_state(&self, state: BackFillState) {
|
||||
*self.network_globals.backfill_state.write() = state;
|
||||
|
||||
@@ -10,10 +10,22 @@ use std::marker::PhantomData;
|
||||
use std::ops::Sub;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use strum::Display;
|
||||
use strum::{Display, EnumIter, IntoStaticStr};
|
||||
use types::Slot;
|
||||
use types::{DataColumnSidecarList, Epoch, EthSpec};
|
||||
|
||||
/// Batch states used as metrics labels.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumIter, IntoStaticStr)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum BatchMetricsState {
|
||||
AwaitingDownload,
|
||||
Downloading,
|
||||
AwaitingProcessing,
|
||||
Processing,
|
||||
AwaitingValidation,
|
||||
Failed,
|
||||
}
|
||||
|
||||
pub type BatchId = Epoch;
|
||||
|
||||
/// Type of expected batch.
|
||||
@@ -142,6 +154,18 @@ impl<D: Hash> BatchState<D> {
|
||||
pub fn poison(&mut self) -> BatchState<D> {
|
||||
std::mem::replace(self, BatchState::Poisoned)
|
||||
}
|
||||
|
||||
/// Returns the metrics state for this batch.
|
||||
pub fn metrics_state(&self) -> BatchMetricsState {
|
||||
match self {
|
||||
BatchState::AwaitingDownload => BatchMetricsState::AwaitingDownload,
|
||||
BatchState::Downloading(_) => BatchMetricsState::Downloading,
|
||||
BatchState::AwaitingProcessing(..) => BatchMetricsState::AwaitingProcessing,
|
||||
BatchState::Processing(_) => BatchMetricsState::Processing,
|
||||
BatchState::AwaitingValidation(_) => BatchMetricsState::AwaitingValidation,
|
||||
BatchState::Poisoned | BatchState::Failed => BatchMetricsState::Failed,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec, B: BatchConfig, D: Hash> BatchInfo<E, B, D> {
|
||||
|
||||
@@ -12,14 +12,16 @@ use lighthouse_network::{
|
||||
};
|
||||
use logging::crit;
|
||||
use std::hash::{DefaultHasher, Hash, Hasher};
|
||||
use strum::IntoEnumIterator;
|
||||
use tracing::{debug, error, info, info_span, warn};
|
||||
use types::{DataColumnSidecarList, Epoch, EthSpec};
|
||||
|
||||
use crate::metrics;
|
||||
use crate::sync::{
|
||||
backfill_sync::{BACKFILL_EPOCHS_PER_BATCH, ProcessResult, SyncStart},
|
||||
batch::{
|
||||
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
|
||||
ByRangeRequestType,
|
||||
BatchConfig, BatchId, BatchInfo, BatchMetricsState, BatchOperationOutcome,
|
||||
BatchProcessingResult, BatchState, ByRangeRequestType,
|
||||
},
|
||||
block_sidecar_coupling::CouplingError,
|
||||
manager::CustodyBatchProcessResult,
|
||||
@@ -1114,6 +1116,21 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
|
||||
*self.network_globals.custody_sync_state.write() = state;
|
||||
}
|
||||
|
||||
pub fn register_metrics(&self) {
|
||||
for state in BatchMetricsState::iter() {
|
||||
let count = self
|
||||
.batches
|
||||
.values()
|
||||
.filter(|b| b.state().metrics_state() == state)
|
||||
.count();
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::SYNCING_CHAIN_BATCHES,
|
||||
&["custody_backfill", state.into()],
|
||||
count as i64,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// A fully synced peer has joined us.
|
||||
/// If we are in a failed state, update a local variable to indicate we are able to restart
|
||||
/// the failed sync on the next attempt.
|
||||
|
||||
@@ -784,6 +784,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
}
|
||||
_ = register_metrics_interval.tick() => {
|
||||
self.network.register_metrics();
|
||||
self.range_sync.register_metrics();
|
||||
self.backfill_sync.register_metrics();
|
||||
self.custody_backfill_sync.register_metrics();
|
||||
}
|
||||
_ = epoch_interval.tick() => {
|
||||
self.update_sync_state();
|
||||
|
||||
@@ -3,7 +3,8 @@ use crate::metrics;
|
||||
use crate::network_beacon_processor::ChainSegmentProcessId;
|
||||
use crate::sync::batch::BatchId;
|
||||
use crate::sync::batch::{
|
||||
BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
|
||||
BatchConfig, BatchInfo, BatchMetricsState, BatchOperationOutcome, BatchProcessingResult,
|
||||
BatchState,
|
||||
};
|
||||
use crate::sync::block_sidecar_coupling::CouplingError;
|
||||
use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError};
|
||||
@@ -234,6 +235,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Returns the number of batches in the given metrics state.
|
||||
pub fn count_batches_in_state(&self, state: BatchMetricsState) -> usize {
|
||||
self.batches
|
||||
.values()
|
||||
.filter(|b| b.state().metrics_state() == state)
|
||||
.count()
|
||||
}
|
||||
|
||||
/// Removes a peer from the chain.
|
||||
/// If the peer has active batches, those are considered failed and re-requested.
|
||||
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain};
|
||||
use super::sync_type::RangeSyncType;
|
||||
use crate::metrics;
|
||||
use crate::sync::batch::BatchMetricsState;
|
||||
use crate::sync::network_context::SyncNetworkContext;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use fnv::FnvHashMap;
|
||||
@@ -17,6 +18,7 @@ use smallvec::SmallVec;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::sync::Arc;
|
||||
use strum::IntoEnumIterator;
|
||||
use tracing::{debug, error};
|
||||
use types::EthSpec;
|
||||
use types::{Epoch, Hash256, Slot};
|
||||
@@ -351,7 +353,8 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
.iter()
|
||||
.map(|(id, chain)| (chain.available_peers(), !chain.is_syncing(), *id))
|
||||
.collect::<Vec<_>>();
|
||||
preferred_ids.sort_unstable();
|
||||
// Sort in descending order
|
||||
preferred_ids.sort_unstable_by(|a, b| b.cmp(a));
|
||||
|
||||
let mut syncing_chains = SmallVec::<[Id; PARALLEL_HEAD_CHAINS]>::new();
|
||||
for (_, _, id) in preferred_ids {
|
||||
@@ -515,6 +518,25 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_metrics(&self) {
|
||||
for (sync_type, chains) in [
|
||||
("range_finalized", &self.finalized_chains),
|
||||
("range_head", &self.head_chains),
|
||||
] {
|
||||
for state in BatchMetricsState::iter() {
|
||||
let count: usize = chains
|
||||
.values()
|
||||
.map(|chain| chain.count_batches_in_state(state))
|
||||
.sum();
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::SYNCING_CHAIN_BATCHES,
|
||||
&[sync_type, state.into()],
|
||||
count as i64,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn update_metrics(&self) {
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::SYNCING_CHAINS_COUNT,
|
||||
|
||||
@@ -371,6 +371,10 @@ where
|
||||
.update(network, &local, &mut self.awaiting_head_peers);
|
||||
}
|
||||
|
||||
pub fn register_metrics(&self) {
|
||||
self.chains.register_metrics();
|
||||
}
|
||||
|
||||
/// Kickstarts sync.
|
||||
pub fn resume(&mut self, network: &mut SyncNetworkContext<T>) {
|
||||
for (removed_chain, sync_type, remove_reason) in
|
||||
|
||||
@@ -76,8 +76,6 @@ const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4;
|
||||
const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4;
|
||||
const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4;
|
||||
const HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT: u32 = 4;
|
||||
// Generally the timeout for events should be longer than a slot.
|
||||
const HTTP_GET_EVENTS_TIMEOUT_MULTIPLIER: u32 = 50;
|
||||
const HTTP_DEFAULT_TIMEOUT_QUOTIENT: u32 = 4;
|
||||
|
||||
/// A struct to define a variety of different timeouts for different validator tasks to ensure
|
||||
@@ -98,7 +96,6 @@ pub struct Timeouts {
|
||||
pub get_debug_beacon_states: Duration,
|
||||
pub get_deposit_snapshot: Duration,
|
||||
pub get_validator_block: Duration,
|
||||
pub events: Duration,
|
||||
pub default: Duration,
|
||||
}
|
||||
|
||||
@@ -119,7 +116,6 @@ impl Timeouts {
|
||||
get_debug_beacon_states: timeout,
|
||||
get_deposit_snapshot: timeout,
|
||||
get_validator_block: timeout,
|
||||
events: HTTP_GET_EVENTS_TIMEOUT_MULTIPLIER * timeout,
|
||||
default: timeout,
|
||||
}
|
||||
}
|
||||
@@ -142,7 +138,6 @@ impl Timeouts {
|
||||
get_debug_beacon_states: base_timeout / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT,
|
||||
get_deposit_snapshot: base_timeout / HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT,
|
||||
get_validator_block: base_timeout / HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT,
|
||||
events: HTTP_GET_EVENTS_TIMEOUT_MULTIPLIER * base_timeout,
|
||||
default: base_timeout / HTTP_DEFAULT_TIMEOUT_QUOTIENT,
|
||||
}
|
||||
}
|
||||
@@ -2805,10 +2800,14 @@ impl BeaconNodeHttpClient {
|
||||
.join(",");
|
||||
path.query_pairs_mut().append_pair("topics", &topic_string);
|
||||
|
||||
// Do not use a timeout for the events endpoint. Using a regular timeout will trigger a
|
||||
// timeout every `timeout` seconds, regardless of any data streamed from the endpoint.
|
||||
// In future we could add a read_timeout, but that can only be configured globally on the
|
||||
// Client.
|
||||
let mut es = self
|
||||
.client
|
||||
.get(path)
|
||||
.timeout(self.timeouts.events)
|
||||
.timeout(Duration::MAX)
|
||||
.eventsource()
|
||||
.map_err(Error::SseEventSource)?;
|
||||
// If we don't await `Event::Open` here, then the consumer
|
||||
|
||||
Reference in New Issue
Block a user