mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-03 08:41:43 +00:00
Compare commits
236 Commits
v8.0.0-rc.
...
beacon-api
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6e54763c79 | ||
|
|
f290c68c93 | ||
|
|
7b283c5ddb | ||
|
|
0e5993943e | ||
|
|
4065ef66ab | ||
|
|
9f40d91d51 | ||
|
|
e3ce7fc5ea | ||
|
|
71a2eadc46 | ||
|
|
386aacda2a | ||
|
|
c4cb8ad833 | ||
|
|
d394746248 | ||
|
|
d1357e459a | ||
|
|
80266a8109 | ||
|
|
dabb3d12dc | ||
|
|
0c2ee92f90 | ||
|
|
f405601d6f | ||
|
|
39d41ada93 | ||
|
|
6766f329e3 | ||
|
|
69ac34209c | ||
|
|
c9fe10b366 | ||
|
|
257bcc37fc | ||
|
|
033457ce89 | ||
|
|
806a5ebe1f | ||
|
|
897f06a29c | ||
|
|
4a858b3f6b | ||
|
|
51a8c80069 | ||
|
|
a8d84d69c5 | ||
|
|
87fde510b8 | ||
|
|
d137881614 | ||
|
|
68fd7a7881 | ||
|
|
cf030d0a8a | ||
|
|
5517c78102 | ||
|
|
8fc533368c | ||
|
|
09141ec51a | ||
|
|
8715589e40 | ||
|
|
7509cf6d3b | ||
|
|
70a80d5da0 | ||
|
|
339d1b8229 | ||
|
|
13b1b05960 | ||
|
|
ebbb17b6bc | ||
|
|
b6913ae542 | ||
|
|
27ed90e4dc | ||
|
|
68035eb5e6 | ||
|
|
09f48c5527 | ||
|
|
af98e98c25 | ||
|
|
c276af6061 | ||
|
|
536c9f83b6 | ||
|
|
dd0aa8e2ec | ||
|
|
efb8a01e91 | ||
|
|
f85a124362 | ||
|
|
0e2add2daa | ||
|
|
381bbaba94 | ||
|
|
afb9122cc1 | ||
|
|
6e4483288a | ||
|
|
3977b92c49 | ||
|
|
d67270f899 | ||
|
|
a8d8989c05 | ||
|
|
9e6e76fb89 | ||
|
|
70a2d4de10 | ||
|
|
cbb7c5d8f4 | ||
|
|
370d511223 | ||
|
|
4d4c268e1e | ||
|
|
7fce143300 | ||
|
|
4d3edfeaed | ||
|
|
7521f97ca5 | ||
|
|
d26473621a | ||
|
|
444cd625ef | ||
|
|
6f0b78426a | ||
|
|
6a4d842376 | ||
|
|
dec7cff9c7 | ||
|
|
2634a1f1a6 | ||
|
|
7af3f2eb35 | ||
|
|
9e84779522 | ||
|
|
45d007a71f | ||
|
|
5070ab254d | ||
|
|
f0492852f3 | ||
|
|
4f08f6e0da | ||
|
|
5acc0523df | ||
|
|
1d0e3f4d30 | ||
|
|
960f8c5c48 | ||
|
|
795eff9bf4 | ||
|
|
3ec21a2435 | ||
|
|
dd0d5e2d93 | ||
|
|
d87541c045 | ||
|
|
9a01b6b363 | ||
|
|
3ac3ddb2b7 | ||
|
|
c4f2284dbe | ||
|
|
d5aa2d8dfe | ||
|
|
d7f3c9583e | ||
|
|
35e07eb0a9 | ||
|
|
a5ee0ed91f | ||
|
|
f57fa8788d | ||
|
|
c43d1c2884 | ||
|
|
8dc9f38a60 | ||
|
|
b21b1086f1 | ||
|
|
c2c2bafa9a | ||
|
|
49db91b27e | ||
|
|
772ab53811 | ||
|
|
f25531d4cc | ||
|
|
77c630bc2e | ||
|
|
f9d354539a | ||
|
|
7a408b7724 | ||
|
|
7d3a5dfab4 | ||
|
|
40139440c9 | ||
|
|
a647a3635f | ||
|
|
29ed1c5c26 | ||
|
|
49de63f792 | ||
|
|
b61d244c0c | ||
|
|
e340998241 | ||
|
|
75432e1135 | ||
|
|
72abfa4fec | ||
|
|
aed25c49e3 | ||
|
|
36a7b1280f | ||
|
|
1aa410cd8a | ||
|
|
57b6a9ab91 | ||
|
|
9440c36202 | ||
|
|
3e10e68c1d | ||
|
|
3f169ef17a | ||
|
|
469296b665 | ||
|
|
bb734afa1d | ||
|
|
154b7a7b8a | ||
|
|
82858bc04e | ||
|
|
987abe07f9 | ||
|
|
f9c50bca07 | ||
|
|
bafb5f0cc0 | ||
|
|
8e537d139e | ||
|
|
a8088f1bfa | ||
|
|
79a5f2556f | ||
|
|
210ad2ff77 | ||
|
|
0c29896438 | ||
|
|
d8941d70b6 | ||
|
|
fc15736fcb | ||
|
|
227aa4bc4f | ||
|
|
4f0ecf2a5c | ||
|
|
ec055f4717 | ||
|
|
8506fb056f | ||
|
|
84689379af | ||
|
|
7f5490675c | ||
|
|
c680164742 | ||
|
|
812b3d77d0 | ||
|
|
5f73d315b5 | ||
|
|
c53d4ac459 | ||
|
|
793764f066 | ||
|
|
217fa9f805 | ||
|
|
179324b9fa | ||
|
|
67ba04e9ec | ||
|
|
97e88dd23d | ||
|
|
c900a88461 | ||
|
|
aaf8e503c6 | ||
|
|
af7ba6ff70 | ||
|
|
3b1fb0ad81 | ||
|
|
b8dc6288f1 | ||
|
|
fc2c942de4 | ||
|
|
28cf796072 | ||
|
|
75f22ee844 | ||
|
|
f4907ef971 | ||
|
|
a75257fb6e | ||
|
|
1ab786a9a9 | ||
|
|
518a91a7a6 | ||
|
|
261551e3c6 | ||
|
|
a97e86c1a6 | ||
|
|
9a22eb8698 | ||
|
|
5364ba53fa | ||
|
|
40c4c00097 | ||
|
|
b819d2d0a6 | ||
|
|
e1dcfb6960 | ||
|
|
4b28872671 | ||
|
|
9bd430bea2 | ||
|
|
677a94d507 | ||
|
|
f60eac6abc | ||
|
|
89e4de90d5 | ||
|
|
7926afeb18 | ||
|
|
be9c4bb587 | ||
|
|
6d2c396ef2 | ||
|
|
ba02ffc162 | ||
|
|
08e045875f | ||
|
|
72548cb54e | ||
|
|
16265ef455 | ||
|
|
437e8516cd | ||
|
|
d505c04507 | ||
|
|
e494b411e7 | ||
|
|
9b5ea9d867 | ||
|
|
aa83e8b889 | ||
|
|
e4485570f2 | ||
|
|
6477eecc65 | ||
|
|
411fcee2ac | ||
|
|
f9d4a28168 | ||
|
|
fae4a2bccc | ||
|
|
19f8333a8b | ||
|
|
b807d39bad | ||
|
|
ca0967119b | ||
|
|
ab9e58aa3d | ||
|
|
c30f70906b | ||
|
|
cb8c8f59cf | ||
|
|
c575cd61b7 | ||
|
|
6fe919a8e7 | ||
|
|
36a559e11a | ||
|
|
07229b76ed | ||
|
|
3ea3d226e1 | ||
|
|
e32dfcdcad | ||
|
|
7cb7653d36 | ||
|
|
c20fc48eb4 | ||
|
|
c8fca4f1d0 | ||
|
|
5e1d5ff641 | ||
|
|
dd5c9a8c81 | ||
|
|
683de56f6e | ||
|
|
3ef7c9078e | ||
|
|
1ddd078d32 | ||
|
|
ca2a946175 | ||
|
|
42a499373f | ||
|
|
7abb7621d5 | ||
|
|
721e73fd82 | ||
|
|
43c3f63e30 | ||
|
|
7c0a8f840e | ||
|
|
f30246b9d4 | ||
|
|
90179d4a88 | ||
|
|
8517236aed | ||
|
|
5728f78032 | ||
|
|
75ab913a3a | ||
|
|
31955c2e7f | ||
|
|
c40bec9319 | ||
|
|
32357d8f0a | ||
|
|
1d5f75582f | ||
|
|
3c688410cc | ||
|
|
f1f9f92dec | ||
|
|
e0abede1d1 | ||
|
|
e2e82ff1b9 | ||
|
|
2c2e44c4ed | ||
|
|
38382a3ca1 | ||
|
|
9f6de8e5d7 | ||
|
|
19a9479234 | ||
|
|
7c6526d978 | ||
|
|
9b98f4e297 | ||
|
|
3a41e137d1 | ||
|
|
e6c7f145dd | ||
|
|
3b7132bc0d |
@@ -546,12 +546,20 @@ impl<E: EthSpec> Eth1ChainBackend<E> for CachingEth1Backend<E> {
|
||||
state.eth1_data().deposit_count
|
||||
};
|
||||
|
||||
match deposit_index.cmp(&deposit_count) {
|
||||
// [New in Electra:EIP6110]
|
||||
let deposit_index_limit =
|
||||
if let Ok(deposit_receipts_start_index) = state.deposit_requests_start_index() {
|
||||
std::cmp::min(deposit_count, deposit_receipts_start_index)
|
||||
} else {
|
||||
deposit_count
|
||||
};
|
||||
|
||||
match deposit_index.cmp(&deposit_index_limit) {
|
||||
Ordering::Greater => Err(Error::DepositIndexTooHigh),
|
||||
Ordering::Equal => Ok(vec![]),
|
||||
Ordering::Less => {
|
||||
let next = deposit_index;
|
||||
let last = std::cmp::min(deposit_count, next + E::MaxDeposits::to_u64());
|
||||
let last = std::cmp::min(deposit_index_limit, next + E::MaxDeposits::to_u64());
|
||||
|
||||
self.core
|
||||
.deposits()
|
||||
|
||||
@@ -31,6 +31,7 @@ mod validators;
|
||||
mod version;
|
||||
|
||||
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
|
||||
use crate::version::fork_versioned_response;
|
||||
use beacon_chain::{
|
||||
attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome,
|
||||
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError,
|
||||
@@ -256,12 +257,15 @@ pub fn prometheus_metrics() -> warp::filters::log::Log<impl Fn(warp::filters::lo
|
||||
.or_else(|| starts_with("v1/validator/duties/sync"))
|
||||
.or_else(|| starts_with("v1/validator/attestation_data"))
|
||||
.or_else(|| starts_with("v1/validator/aggregate_attestation"))
|
||||
.or_else(|| starts_with("v2/validator/aggregate_attestation"))
|
||||
.or_else(|| starts_with("v1/validator/aggregate_and_proofs"))
|
||||
.or_else(|| starts_with("v2/validator/aggregate_and_proofs"))
|
||||
.or_else(|| starts_with("v1/validator/sync_committee_contribution"))
|
||||
.or_else(|| starts_with("v1/validator/contribution_and_proofs"))
|
||||
.or_else(|| starts_with("v1/validator/beacon_committee_subscriptions"))
|
||||
.or_else(|| starts_with("v1/validator/sync_committee_subscriptions"))
|
||||
.or_else(|| starts_with("v1/beacon/pool/attestations"))
|
||||
.or_else(|| starts_with("v2/beacon/pool/attestations"))
|
||||
.or_else(|| starts_with("v1/beacon/pool/sync_committees"))
|
||||
.or_else(|| starts_with("v1/beacon/blocks/head/root"))
|
||||
.or_else(|| starts_with("v1/validator/prepare_beacon_proposer"))
|
||||
@@ -1623,26 +1627,38 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
);
|
||||
|
||||
// GET beacon/blocks/{block_id}/attestations
|
||||
let get_beacon_block_attestations = beacon_blocks_path_v1
|
||||
let get_beacon_block_attestations = beacon_blocks_path_any
|
||||
.clone()
|
||||
.and(warp::path("attestations"))
|
||||
.and(warp::path::end())
|
||||
.then(
|
||||
|block_id: BlockId,
|
||||
|endpoint_version: EndpointVersion,
|
||||
block_id: BlockId,
|
||||
task_spawner: TaskSpawner<T::EthSpec>,
|
||||
chain: Arc<BeaconChain<T>>| {
|
||||
task_spawner.blocking_json_task(Priority::P1, move || {
|
||||
task_spawner.blocking_response_task(Priority::P1, move || {
|
||||
let (block, execution_optimistic, finalized) =
|
||||
block_id.blinded_block(&chain)?;
|
||||
Ok(api_types::GenericResponse::from(
|
||||
block
|
||||
.message()
|
||||
.body()
|
||||
.attestations()
|
||||
.map(|att| att.clone_as_attestation())
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.add_execution_optimistic_finalized(execution_optimistic, finalized))
|
||||
let fork_name = block
|
||||
.fork_name(&chain.spec)
|
||||
.map_err(inconsistent_fork_rejection)?;
|
||||
let atts = block
|
||||
.message()
|
||||
.body()
|
||||
.attestations()
|
||||
.map(|att| att.clone_as_attestation())
|
||||
.collect::<Vec<_>>();
|
||||
let res = execution_optimistic_finalized_fork_versioned_response(
|
||||
endpoint_version,
|
||||
fork_name,
|
||||
execution_optimistic,
|
||||
finalized,
|
||||
&atts,
|
||||
)?;
|
||||
Ok(add_consensus_version_header(
|
||||
warp::reply::json(&res).into_response(),
|
||||
fork_name,
|
||||
))
|
||||
})
|
||||
},
|
||||
);
|
||||
@@ -1750,8 +1766,14 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.and(task_spawner_filter.clone())
|
||||
.and(chain_filter.clone());
|
||||
|
||||
let beacon_pool_path_any = any_version
|
||||
.and(warp::path("beacon"))
|
||||
.and(warp::path("pool"))
|
||||
.and(task_spawner_filter.clone())
|
||||
.and(chain_filter.clone());
|
||||
|
||||
// POST beacon/pool/attestations
|
||||
let post_beacon_pool_attestations = beacon_pool_path
|
||||
let post_beacon_pool_attestations = beacon_pool_path_any
|
||||
.clone()
|
||||
.and(warp::path("attestations"))
|
||||
.and(warp::path::end())
|
||||
@@ -1760,7 +1782,11 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.and(reprocess_send_filter)
|
||||
.and(log_filter.clone())
|
||||
.then(
|
||||
|task_spawner: TaskSpawner<T::EthSpec>,
|
||||
// V1 and V2 are identical except V2 has a consensus version header in the request.
|
||||
// We only require this header for SSZ deserialization, which isn't supported for
|
||||
// this endpoint presently.
|
||||
|_endpoint_version: EndpointVersion,
|
||||
task_spawner: TaskSpawner<T::EthSpec>,
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
attestations: Vec<Attestation<T::EthSpec>>,
|
||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||
@@ -1781,16 +1807,17 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
);
|
||||
|
||||
// GET beacon/pool/attestations?committee_index,slot
|
||||
let get_beacon_pool_attestations = beacon_pool_path
|
||||
let get_beacon_pool_attestations = beacon_pool_path_any
|
||||
.clone()
|
||||
.and(warp::path("attestations"))
|
||||
.and(warp::path::end())
|
||||
.and(warp::query::<api_types::AttestationPoolQuery>())
|
||||
.then(
|
||||
|task_spawner: TaskSpawner<T::EthSpec>,
|
||||
|endpoint_version: EndpointVersion,
|
||||
task_spawner: TaskSpawner<T::EthSpec>,
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
query: api_types::AttestationPoolQuery| {
|
||||
task_spawner.blocking_json_task(Priority::P1, move || {
|
||||
task_spawner.blocking_response_task(Priority::P1, move || {
|
||||
let query_filter = |data: &AttestationData| {
|
||||
query.slot.map_or(true, |slot| slot == data.slot)
|
||||
&& query
|
||||
@@ -1807,20 +1834,48 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.filter(|&att| query_filter(att.data()))
|
||||
.cloned(),
|
||||
);
|
||||
Ok(api_types::GenericResponse::from(attestations))
|
||||
// Use the current slot to find the fork version, and convert all messages to the
|
||||
// current fork's format. This is to ensure consistent message types matching
|
||||
// `Eth-Consensus-Version`.
|
||||
let current_slot =
|
||||
chain
|
||||
.slot_clock
|
||||
.now()
|
||||
.ok_or(warp_utils::reject::custom_server_error(
|
||||
"unable to read slot clock".to_string(),
|
||||
))?;
|
||||
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(current_slot);
|
||||
let attestations = attestations
|
||||
.into_iter()
|
||||
.filter(|att| {
|
||||
(fork_name.electra_enabled() && matches!(att, Attestation::Electra(_)))
|
||||
|| (!fork_name.electra_enabled()
|
||||
&& matches!(att, Attestation::Base(_)))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let res = fork_versioned_response(endpoint_version, fork_name, &attestations)?;
|
||||
Ok(add_consensus_version_header(
|
||||
warp::reply::json(&res).into_response(),
|
||||
fork_name,
|
||||
))
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
// POST beacon/pool/attester_slashings
|
||||
let post_beacon_pool_attester_slashings = beacon_pool_path
|
||||
let post_beacon_pool_attester_slashings = beacon_pool_path_any
|
||||
.clone()
|
||||
.and(warp::path("attester_slashings"))
|
||||
.and(warp::path::end())
|
||||
.and(warp_utils::json::json())
|
||||
.and(network_tx_filter.clone())
|
||||
.then(
|
||||
|task_spawner: TaskSpawner<T::EthSpec>,
|
||||
// V1 and V2 are identical except V2 has a consensus version header in the request.
|
||||
// We only require this header for SSZ deserialization, which isn't supported for
|
||||
// this endpoint presently.
|
||||
|_endpoint_version: EndpointVersion,
|
||||
task_spawner: TaskSpawner<T::EthSpec>,
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
slashing: AttesterSlashing<T::EthSpec>,
|
||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
|
||||
@@ -1857,18 +1912,45 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
);
|
||||
|
||||
// GET beacon/pool/attester_slashings
|
||||
let get_beacon_pool_attester_slashings = beacon_pool_path
|
||||
.clone()
|
||||
.and(warp::path("attester_slashings"))
|
||||
.and(warp::path::end())
|
||||
.then(
|
||||
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
|
||||
task_spawner.blocking_json_task(Priority::P1, move || {
|
||||
let attestations = chain.op_pool.get_all_attester_slashings();
|
||||
Ok(api_types::GenericResponse::from(attestations))
|
||||
})
|
||||
},
|
||||
);
|
||||
let get_beacon_pool_attester_slashings =
|
||||
beacon_pool_path_any
|
||||
.clone()
|
||||
.and(warp::path("attester_slashings"))
|
||||
.and(warp::path::end())
|
||||
.then(
|
||||
|endpoint_version: EndpointVersion,
|
||||
task_spawner: TaskSpawner<T::EthSpec>,
|
||||
chain: Arc<BeaconChain<T>>| {
|
||||
task_spawner.blocking_response_task(Priority::P1, move || {
|
||||
let slashings = chain.op_pool.get_all_attester_slashings();
|
||||
|
||||
// Use the current slot to find the fork version, and convert all messages to the
|
||||
// current fork's format. This is to ensure consistent message types matching
|
||||
// `Eth-Consensus-Version`.
|
||||
let current_slot = chain.slot_clock.now().ok_or(
|
||||
warp_utils::reject::custom_server_error(
|
||||
"unable to read slot clock".to_string(),
|
||||
),
|
||||
)?;
|
||||
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(current_slot);
|
||||
let slashings = slashings
|
||||
.into_iter()
|
||||
.filter(|slashing| {
|
||||
(fork_name.electra_enabled()
|
||||
&& matches!(slashing, AttesterSlashing::Electra(_)))
|
||||
|| (!fork_name.electra_enabled()
|
||||
&& matches!(slashing, AttesterSlashing::Base(_)))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let res = fork_versioned_response(endpoint_version, fork_name, &slashings)?;
|
||||
Ok(add_consensus_version_header(
|
||||
warp::reply::json(&res).into_response(),
|
||||
fork_name,
|
||||
))
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
// POST beacon/pool/proposer_slashings
|
||||
let post_beacon_pool_proposer_slashings = beacon_pool_path
|
||||
@@ -3175,7 +3257,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
);
|
||||
|
||||
// GET validator/aggregate_attestation?attestation_data_root,slot
|
||||
let get_validator_aggregate_attestation = eth_v1
|
||||
let get_validator_aggregate_attestation = any_version
|
||||
.and(warp::path("validator"))
|
||||
.and(warp::path("aggregate_attestation"))
|
||||
.and(warp::path::end())
|
||||
@@ -3184,29 +3266,45 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.and(task_spawner_filter.clone())
|
||||
.and(chain_filter.clone())
|
||||
.then(
|
||||
|query: api_types::ValidatorAggregateAttestationQuery,
|
||||
|endpoint_version: EndpointVersion,
|
||||
query: api_types::ValidatorAggregateAttestationQuery,
|
||||
not_synced_filter: Result<(), Rejection>,
|
||||
task_spawner: TaskSpawner<T::EthSpec>,
|
||||
chain: Arc<BeaconChain<T>>| {
|
||||
task_spawner.blocking_json_task(Priority::P0, move || {
|
||||
not_synced_filter?;
|
||||
chain
|
||||
.get_pre_electra_aggregated_attestation_by_slot_and_root(
|
||||
let res = if endpoint_version == V2 {
|
||||
let Some(committee_index) = query.committee_index else {
|
||||
return Err(warp_utils::reject::custom_bad_request(
|
||||
"missing committee index".to_string(),
|
||||
));
|
||||
};
|
||||
chain.get_aggregated_attestation_electra(
|
||||
query.slot,
|
||||
&query.attestation_data_root,
|
||||
committee_index,
|
||||
)
|
||||
} else if endpoint_version == V1 {
|
||||
// Do nothing
|
||||
chain.get_pre_electra_aggregated_attestation_by_slot_and_root(
|
||||
query.slot,
|
||||
&query.attestation_data_root,
|
||||
)
|
||||
.map_err(|e| {
|
||||
warp_utils::reject::custom_bad_request(format!(
|
||||
"unable to fetch aggregate: {:?}",
|
||||
e
|
||||
))
|
||||
})?
|
||||
.map(api_types::GenericResponse::from)
|
||||
.ok_or_else(|| {
|
||||
warp_utils::reject::custom_not_found(
|
||||
"no matching aggregate found".to_string(),
|
||||
)
|
||||
})
|
||||
} else {
|
||||
return Err(unsupported_version_rejection(endpoint_version));
|
||||
};
|
||||
res.map_err(|e| {
|
||||
warp_utils::reject::custom_bad_request(format!(
|
||||
"unable to fetch aggregate: {:?}",
|
||||
e
|
||||
))
|
||||
})?
|
||||
.map(api_types::GenericResponse::from)
|
||||
.ok_or_else(|| {
|
||||
warp_utils::reject::custom_not_found(
|
||||
"no matching aggregate found".to_string(),
|
||||
)
|
||||
})
|
||||
})
|
||||
},
|
||||
);
|
||||
@@ -3302,7 +3400,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
);
|
||||
|
||||
// POST validator/aggregate_and_proofs
|
||||
let post_validator_aggregate_and_proofs = eth_v1
|
||||
let post_validator_aggregate_and_proofs = any_version
|
||||
.and(warp::path("validator"))
|
||||
.and(warp::path("aggregate_and_proofs"))
|
||||
.and(warp::path::end())
|
||||
@@ -3313,7 +3411,11 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.and(network_tx_filter.clone())
|
||||
.and(log_filter.clone())
|
||||
.then(
|
||||
|not_synced_filter: Result<(), Rejection>,
|
||||
// V1 and V2 are identical except V2 has a consensus version header in the request.
|
||||
// We only require this header for SSZ deserialization, which isn't supported for
|
||||
// this endpoint presently.
|
||||
|_endpoint_version: EndpointVersion,
|
||||
not_synced_filter: Result<(), Rejection>,
|
||||
task_spawner: TaskSpawner<T::EthSpec>,
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
aggregates: Vec<SignedAggregateAndProof<T::EthSpec>>,
|
||||
|
||||
@@ -150,8 +150,13 @@ async fn attestations_across_fork_with_skip_slots() {
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert!(!unaggregated_attestations.is_empty());
|
||||
let fork_name = harness.spec.fork_name_at_slot::<E>(fork_slot);
|
||||
client
|
||||
.post_beacon_pool_attestations(&unaggregated_attestations)
|
||||
.post_beacon_pool_attestations_v1(&unaggregated_attestations)
|
||||
.await
|
||||
.unwrap();
|
||||
client
|
||||
.post_beacon_pool_attestations_v2(&unaggregated_attestations, fork_name)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -162,7 +167,11 @@ async fn attestations_across_fork_with_skip_slots() {
|
||||
assert!(!signed_aggregates.is_empty());
|
||||
|
||||
client
|
||||
.post_validator_aggregate_and_proof(&signed_aggregates)
|
||||
.post_validator_aggregate_and_proof_v1(&signed_aggregates)
|
||||
.await
|
||||
.unwrap();
|
||||
client
|
||||
.post_validator_aggregate_and_proof_v2(&signed_aggregates, fork_name)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -893,9 +893,10 @@ async fn queue_attestations_from_http() {
|
||||
.flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let fork_name = tester.harness.spec.fork_name_at_slot::<E>(attestation_slot);
|
||||
let attestation_future = tokio::spawn(async move {
|
||||
client
|
||||
.post_beacon_pool_attestations(&attestations)
|
||||
.post_beacon_pool_attestations_v2(&attestations, fork_name)
|
||||
.await
|
||||
.expect("attestations should be processed successfully")
|
||||
});
|
||||
|
||||
@@ -1668,7 +1668,7 @@ impl ApiTester {
|
||||
for block_id in self.interesting_block_ids() {
|
||||
let result = self
|
||||
.client
|
||||
.get_beacon_blocks_attestations(block_id.0)
|
||||
.get_beacon_blocks_attestations_v2(block_id.0)
|
||||
.await
|
||||
.unwrap()
|
||||
.map(|res| res.data);
|
||||
@@ -1699,9 +1699,9 @@ impl ApiTester {
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self {
|
||||
pub async fn test_post_beacon_pool_attestations_valid_v1(mut self) -> Self {
|
||||
self.client
|
||||
.post_beacon_pool_attestations(self.attestations.as_slice())
|
||||
.post_beacon_pool_attestations_v1(self.attestations.as_slice())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -1713,7 +1713,25 @@ impl ApiTester {
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_post_beacon_pool_attestations_invalid(mut self) -> Self {
|
||||
pub async fn test_post_beacon_pool_attestations_valid_v2(mut self) -> Self {
|
||||
let fork_name = self
|
||||
.attestations
|
||||
.first()
|
||||
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data().slot))
|
||||
.unwrap();
|
||||
self.client
|
||||
.post_beacon_pool_attestations_v2(self.attestations.as_slice(), fork_name)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(
|
||||
self.network_rx.network_recv.recv().await.is_some(),
|
||||
"valid attestation should be sent to network"
|
||||
);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_post_beacon_pool_attestations_invalid_v1(mut self) -> Self {
|
||||
let mut attestations = Vec::new();
|
||||
for attestation in &self.attestations {
|
||||
let mut invalid_attestation = attestation.clone();
|
||||
@@ -1726,7 +1744,7 @@ impl ApiTester {
|
||||
|
||||
let err = self
|
||||
.client
|
||||
.post_beacon_pool_attestations(attestations.as_slice())
|
||||
.post_beacon_pool_attestations_v1(attestations.as_slice())
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
@@ -1749,6 +1767,48 @@ impl ApiTester {
|
||||
|
||||
self
|
||||
}
|
||||
pub async fn test_post_beacon_pool_attestations_invalid_v2(mut self) -> Self {
|
||||
let mut attestations = Vec::new();
|
||||
for attestation in &self.attestations {
|
||||
let mut invalid_attestation = attestation.clone();
|
||||
invalid_attestation.data_mut().slot += 1;
|
||||
|
||||
// add both to ensure we only fail on invalid attestations
|
||||
attestations.push(attestation.clone());
|
||||
attestations.push(invalid_attestation);
|
||||
}
|
||||
|
||||
let fork_name = self
|
||||
.attestations
|
||||
.first()
|
||||
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data().slot))
|
||||
.unwrap();
|
||||
|
||||
let err_v2 = self
|
||||
.client
|
||||
.post_beacon_pool_attestations_v2(attestations.as_slice(), fork_name)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
match err_v2 {
|
||||
Error::ServerIndexedMessage(IndexedErrorMessage {
|
||||
code,
|
||||
message: _,
|
||||
failures,
|
||||
}) => {
|
||||
assert_eq!(code, 400);
|
||||
assert_eq!(failures.len(), self.attestations.len());
|
||||
}
|
||||
_ => panic!("query did not fail correctly"),
|
||||
}
|
||||
|
||||
assert!(
|
||||
self.network_rx.network_recv.recv().await.is_some(),
|
||||
"if some attestations are valid, we should send them to the network"
|
||||
);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_get_beacon_light_client_bootstrap(self) -> Self {
|
||||
let block_id = BlockId(CoreBlockId::Finalized);
|
||||
@@ -1812,7 +1872,7 @@ impl ApiTester {
|
||||
pub async fn test_get_beacon_pool_attestations(self) -> Self {
|
||||
let result = self
|
||||
.client
|
||||
.get_beacon_pool_attestations(None, None)
|
||||
.get_beacon_pool_attestations_v1(None, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.data;
|
||||
@@ -1822,12 +1882,20 @@ impl ApiTester {
|
||||
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let result = self
|
||||
.client
|
||||
.get_beacon_pool_attestations_v2(None, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.data;
|
||||
assert_eq!(result, expected);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_post_beacon_pool_attester_slashings_valid(mut self) -> Self {
|
||||
pub async fn test_post_beacon_pool_attester_slashings_valid_v1(mut self) -> Self {
|
||||
self.client
|
||||
.post_beacon_pool_attester_slashings(&self.attester_slashing)
|
||||
.post_beacon_pool_attester_slashings_v1(&self.attester_slashing)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -1839,7 +1907,25 @@ impl ApiTester {
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_post_beacon_pool_attester_slashings_invalid(mut self) -> Self {
|
||||
pub async fn test_post_beacon_pool_attester_slashings_valid_v2(mut self) -> Self {
|
||||
let fork_name = self
|
||||
.chain
|
||||
.spec
|
||||
.fork_name_at_slot::<E>(self.attester_slashing.attestation_1().data().slot);
|
||||
self.client
|
||||
.post_beacon_pool_attester_slashings_v2(&self.attester_slashing, fork_name)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
self.network_rx.network_recv.recv().await.is_some(),
|
||||
"valid attester slashing should be sent to network"
|
||||
);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_post_beacon_pool_attester_slashings_invalid_v1(mut self) -> Self {
|
||||
let mut slashing = self.attester_slashing.clone();
|
||||
match &mut slashing {
|
||||
AttesterSlashing::Base(ref mut slashing) => {
|
||||
@@ -1851,7 +1937,35 @@ impl ApiTester {
|
||||
}
|
||||
|
||||
self.client
|
||||
.post_beacon_pool_attester_slashings(&slashing)
|
||||
.post_beacon_pool_attester_slashings_v1(&slashing)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
assert!(
|
||||
self.network_rx.network_recv.recv().now_or_never().is_none(),
|
||||
"invalid attester slashing should not be sent to network"
|
||||
);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_post_beacon_pool_attester_slashings_invalid_v2(mut self) -> Self {
|
||||
let mut slashing = self.attester_slashing.clone();
|
||||
match &mut slashing {
|
||||
AttesterSlashing::Base(ref mut slashing) => {
|
||||
slashing.attestation_1.data.slot += 1;
|
||||
}
|
||||
AttesterSlashing::Electra(ref mut slashing) => {
|
||||
slashing.attestation_1.data.slot += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let fork_name = self
|
||||
.chain
|
||||
.spec
|
||||
.fork_name_at_slot::<E>(self.attester_slashing.attestation_1().data().slot);
|
||||
self.client
|
||||
.post_beacon_pool_attester_slashings_v2(&slashing, fork_name)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
@@ -1866,7 +1980,7 @@ impl ApiTester {
|
||||
pub async fn test_get_beacon_pool_attester_slashings(self) -> Self {
|
||||
let result = self
|
||||
.client
|
||||
.get_beacon_pool_attester_slashings()
|
||||
.get_beacon_pool_attester_slashings_v1()
|
||||
.await
|
||||
.unwrap()
|
||||
.data;
|
||||
@@ -1875,6 +1989,14 @@ impl ApiTester {
|
||||
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let result = self
|
||||
.client
|
||||
.get_beacon_pool_attester_slashings_v2()
|
||||
.await
|
||||
.unwrap()
|
||||
.data;
|
||||
assert_eq!(result, expected);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
@@ -3233,30 +3355,52 @@ impl ApiTester {
|
||||
}
|
||||
|
||||
pub async fn test_get_validator_aggregate_attestation(self) -> Self {
|
||||
let attestation = self
|
||||
if self
|
||||
.chain
|
||||
.head_beacon_block()
|
||||
.message()
|
||||
.body()
|
||||
.attestations()
|
||||
.next()
|
||||
.unwrap()
|
||||
.clone_as_attestation();
|
||||
.spec
|
||||
.fork_name_at_slot::<E>(self.chain.slot().unwrap())
|
||||
.electra_enabled()
|
||||
{
|
||||
for attestation in self.chain.naive_aggregation_pool.read().iter() {
|
||||
let result = self
|
||||
.client
|
||||
.get_validator_aggregate_attestation_v2(
|
||||
attestation.data().slot,
|
||||
attestation.data().tree_hash_root(),
|
||||
attestation.committee_index().unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.data;
|
||||
let expected = attestation;
|
||||
|
||||
let result = self
|
||||
.client
|
||||
.get_validator_aggregate_attestation(
|
||||
attestation.data().slot,
|
||||
attestation.data().tree_hash_root(),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.data;
|
||||
assert_eq!(&result, expected);
|
||||
}
|
||||
} else {
|
||||
let attestation = self
|
||||
.chain
|
||||
.head_beacon_block()
|
||||
.message()
|
||||
.body()
|
||||
.attestations()
|
||||
.next()
|
||||
.unwrap()
|
||||
.clone_as_attestation();
|
||||
let result = self
|
||||
.client
|
||||
.get_validator_aggregate_attestation_v1(
|
||||
attestation.data().slot,
|
||||
attestation.data().tree_hash_root(),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.data;
|
||||
let expected = attestation;
|
||||
|
||||
let expected = attestation;
|
||||
|
||||
assert_eq!(result, expected);
|
||||
assert_eq!(result, expected);
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
@@ -3355,11 +3499,11 @@ impl ApiTester {
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn test_get_validator_aggregate_and_proofs_valid(mut self) -> Self {
|
||||
pub async fn test_get_validator_aggregate_and_proofs_valid_v1(mut self) -> Self {
|
||||
let aggregate = self.get_aggregate().await;
|
||||
|
||||
self.client
|
||||
.post_validator_aggregate_and_proof::<E>(&[aggregate])
|
||||
.post_validator_aggregate_and_proof_v1::<E>(&[aggregate])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -3368,7 +3512,7 @@ impl ApiTester {
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_get_validator_aggregate_and_proofs_invalid(mut self) -> Self {
|
||||
pub async fn test_get_validator_aggregate_and_proofs_invalid_v1(mut self) -> Self {
|
||||
let mut aggregate = self.get_aggregate().await;
|
||||
match &mut aggregate {
|
||||
SignedAggregateAndProof::Base(ref mut aggregate) => {
|
||||
@@ -3380,7 +3524,7 @@ impl ApiTester {
|
||||
}
|
||||
|
||||
self.client
|
||||
.post_validator_aggregate_and_proof::<E>(&[aggregate])
|
||||
.post_validator_aggregate_and_proof_v1::<E>(&[aggregate.clone()])
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
@@ -3389,6 +3533,46 @@ impl ApiTester {
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_get_validator_aggregate_and_proofs_valid_v2(mut self) -> Self {
|
||||
let aggregate = self.get_aggregate().await;
|
||||
let fork_name = self
|
||||
.chain
|
||||
.spec
|
||||
.fork_name_at_slot::<E>(aggregate.message().aggregate().data().slot);
|
||||
self.client
|
||||
.post_validator_aggregate_and_proof_v2::<E>(&[aggregate], fork_name)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(self.network_rx.network_recv.recv().await.is_some());
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_get_validator_aggregate_and_proofs_invalid_v2(mut self) -> Self {
|
||||
let mut aggregate = self.get_aggregate().await;
|
||||
match &mut aggregate {
|
||||
SignedAggregateAndProof::Base(ref mut aggregate) => {
|
||||
aggregate.message.aggregate.data.slot += 1;
|
||||
}
|
||||
SignedAggregateAndProof::Electra(ref mut aggregate) => {
|
||||
aggregate.message.aggregate.data.slot += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let fork_name = self
|
||||
.chain
|
||||
.spec
|
||||
.fork_name_at_slot::<E>(aggregate.message().aggregate().data().slot);
|
||||
self.client
|
||||
.post_validator_aggregate_and_proof_v2::<E>(&[aggregate], fork_name)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(self.network_rx.network_recv.recv().now_or_never().is_none());
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_get_validator_beacon_committee_subscriptions(mut self) -> Self {
|
||||
let subscription = BeaconCommitteeSubscription {
|
||||
validator_index: 0,
|
||||
@@ -3484,7 +3668,7 @@ impl ApiTester {
|
||||
pub async fn test_post_validator_register_validator_slashed(self) -> Self {
|
||||
// slash a validator
|
||||
self.client
|
||||
.post_beacon_pool_attester_slashings(&self.attester_slashing)
|
||||
.post_beacon_pool_attester_slashings_v1(&self.attester_slashing)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -3597,7 +3781,7 @@ impl ApiTester {
|
||||
|
||||
// Attest to the current slot
|
||||
self.client
|
||||
.post_beacon_pool_attestations(self.attestations.as_slice())
|
||||
.post_beacon_pool_attestations_v1(self.attestations.as_slice())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -5237,7 +5421,7 @@ impl ApiTester {
|
||||
|
||||
// Attest to the current slot
|
||||
self.client
|
||||
.post_beacon_pool_attestations(self.attestations.as_slice())
|
||||
.post_beacon_pool_attestations_v1(self.attestations.as_slice())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -5292,7 +5476,7 @@ impl ApiTester {
|
||||
let expected_attestation_len = self.attestations.len();
|
||||
|
||||
self.client
|
||||
.post_beacon_pool_attestations(self.attestations.as_slice())
|
||||
.post_beacon_pool_attestations_v1(self.attestations.as_slice())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -5801,34 +5985,66 @@ async fn post_beacon_blocks_duplicate() {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn beacon_pools_post_attestations_valid() {
|
||||
async fn beacon_pools_post_attestations_valid_v1() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.test_post_beacon_pool_attestations_valid()
|
||||
.test_post_beacon_pool_attestations_valid_v1()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn beacon_pools_post_attestations_invalid() {
|
||||
async fn beacon_pools_post_attestations_invalid_v1() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.test_post_beacon_pool_attestations_invalid()
|
||||
.test_post_beacon_pool_attestations_invalid_v1()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn beacon_pools_post_attester_slashings_valid() {
|
||||
async fn beacon_pools_post_attestations_valid_v2() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.test_post_beacon_pool_attester_slashings_valid()
|
||||
.test_post_beacon_pool_attestations_valid_v2()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn beacon_pools_post_attester_slashings_invalid() {
|
||||
async fn beacon_pools_post_attestations_invalid_v2() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.test_post_beacon_pool_attester_slashings_invalid()
|
||||
.test_post_beacon_pool_attestations_invalid_v2()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn beacon_pools_post_attester_slashings_valid_v1() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.test_post_beacon_pool_attester_slashings_valid_v1()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn beacon_pools_post_attester_slashings_invalid_v1() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.test_post_beacon_pool_attester_slashings_invalid_v1()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn beacon_pools_post_attester_slashings_valid_v2() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.test_post_beacon_pool_attester_slashings_valid_v2()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn beacon_pools_post_attester_slashings_invalid_v2() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.test_post_beacon_pool_attester_slashings_invalid_v2()
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -6156,36 +6372,70 @@ async fn get_validator_aggregate_attestation_with_skip_slots() {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_validator_aggregate_and_proofs_valid() {
|
||||
async fn get_validator_aggregate_and_proofs_valid_v1() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.test_get_validator_aggregate_and_proofs_valid()
|
||||
.test_get_validator_aggregate_and_proofs_valid_v1()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_validator_aggregate_and_proofs_valid_with_skip_slots() {
|
||||
async fn get_validator_aggregate_and_proofs_valid_with_skip_slots_v1() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.skip_slots(E::slots_per_epoch() * 2)
|
||||
.test_get_validator_aggregate_and_proofs_valid()
|
||||
.test_get_validator_aggregate_and_proofs_valid_v1()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_validator_aggregate_and_proofs_invalid() {
|
||||
async fn get_validator_aggregate_and_proofs_valid_v2() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.test_get_validator_aggregate_and_proofs_invalid()
|
||||
.test_get_validator_aggregate_and_proofs_valid_v2()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots() {
|
||||
async fn get_validator_aggregate_and_proofs_valid_with_skip_slots_v2() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.skip_slots(E::slots_per_epoch() * 2)
|
||||
.test_get_validator_aggregate_and_proofs_invalid()
|
||||
.test_get_validator_aggregate_and_proofs_valid_v2()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_validator_aggregate_and_proofs_invalid_v1() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.test_get_validator_aggregate_and_proofs_invalid_v1()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots_v1() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.skip_slots(E::slots_per_epoch() * 2)
|
||||
.test_get_validator_aggregate_and_proofs_invalid_v1()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_validator_aggregate_and_proofs_invalid_v2() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.test_get_validator_aggregate_and_proofs_invalid_v2()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots_v2() {
|
||||
ApiTester::new()
|
||||
.await
|
||||
.skip_slots(E::slots_per_epoch() * 2)
|
||||
.test_get_validator_aggregate_and_proofs_invalid_v2()
|
||||
.await;
|
||||
}
|
||||
|
||||
|
||||
@@ -103,8 +103,13 @@ impl RateLimiterConfig {
|
||||
pub const DEFAULT_GOODBYE_QUOTA: Quota = Quota::one_every(10);
|
||||
pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota = Quota::n_every(1024, 10);
|
||||
pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
|
||||
pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(768, 10);
|
||||
pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
|
||||
// `BlocksByRange` and `BlobsByRange` are sent together during range sync.
|
||||
// It makes sense for blocks and blobs quotas to be equivalent in terms of the number of blocks:
|
||||
// 1024 blocks * 6 max blobs per block.
|
||||
// This doesn't necessarily mean that we are sending this many blobs, because the quotas are
|
||||
// measured against the maximum request size.
|
||||
pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(6144, 10);
|
||||
pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(768, 10);
|
||||
pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10);
|
||||
pub const DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA: Quota = Quota::one_every(10);
|
||||
pub const DEFAULT_LIGHT_CLIENT_FINALITY_UPDATE_QUOTA: Quota = Quota::one_every(10);
|
||||
|
||||
@@ -237,6 +237,36 @@ lazy_static! {
|
||||
"Number of Syncing chains in range, per range type",
|
||||
&["range_type"]
|
||||
);
|
||||
pub static ref SYNCING_CHAINS_REMOVED: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||
"sync_range_removed_chains_total",
|
||||
"Total count of range syncing chains removed per range type",
|
||||
&["range_type"]
|
||||
);
|
||||
pub static ref SYNCING_CHAINS_ADDED: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||
"sync_range_added_chains_total",
|
||||
"Total count of range syncing chains added per range type",
|
||||
&["range_type"]
|
||||
);
|
||||
pub static ref SYNCING_CHAINS_DROPPED_BLOCKS: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||
"sync_range_chains_dropped_blocks_total",
|
||||
"Total count of dropped blocks when removing a syncing chain per range type",
|
||||
&["range_type"]
|
||||
);
|
||||
pub static ref SYNCING_CHAINS_IGNORED_BLOCKS: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||
"sync_range_chains_ignored_blocks_total",
|
||||
"Total count of ignored blocks when processing a syncing chain batch per chain type",
|
||||
&["chain_type"]
|
||||
);
|
||||
pub static ref SYNCING_CHAINS_PROCESSED_BATCHES: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||
"sync_range_chains_processed_batches_total",
|
||||
"Total count of processed batches in a syncing chain batch per chain type",
|
||||
&["chain_type"]
|
||||
);
|
||||
pub static ref SYNCING_CHAIN_BATCH_AWAITING_PROCESSING: Result<Histogram> = try_create_histogram_with_buckets(
|
||||
"sync_range_chain_batch_awaiting_processing_seconds",
|
||||
"Time range sync batches spend in AwaitingProcessing state",
|
||||
Ok(vec![0.01,0.02,0.05,0.1,0.2,0.5,1.0,2.0,5.0,10.0,20.0])
|
||||
);
|
||||
pub static ref SYNC_SINGLE_BLOCK_LOOKUPS: Result<IntGauge> = try_create_int_gauge(
|
||||
"sync_single_block_lookups",
|
||||
"Number of single block lookups underway"
|
||||
|
||||
@@ -326,7 +326,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
.process_blocks(downloaded_blocks.iter(), notify_execution_layer)
|
||||
.await
|
||||
{
|
||||
(_, Ok(_)) => {
|
||||
(imported_blocks, Ok(_)) => {
|
||||
debug!(self.log, "Batch processed";
|
||||
"batch_epoch" => epoch,
|
||||
"first_block_slot" => start_slot,
|
||||
@@ -335,7 +335,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
"processed_blocks" => sent_blocks,
|
||||
"service"=> "sync");
|
||||
BatchProcessResult::Success {
|
||||
was_non_empty: sent_blocks > 0,
|
||||
sent_blocks,
|
||||
imported_blocks,
|
||||
}
|
||||
}
|
||||
(imported_blocks, Err(e)) => {
|
||||
@@ -349,7 +350,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
"service" => "sync");
|
||||
match e.peer_action {
|
||||
Some(penalty) => BatchProcessResult::FaultyFailure {
|
||||
imported_blocks: imported_blocks > 0,
|
||||
imported_blocks,
|
||||
penalty,
|
||||
},
|
||||
None => BatchProcessResult::NonFaultyFailure,
|
||||
@@ -368,7 +369,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
.sum::<usize>();
|
||||
|
||||
match self.process_backfill_blocks(downloaded_blocks) {
|
||||
(_, Ok(_)) => {
|
||||
(imported_blocks, Ok(_)) => {
|
||||
debug!(self.log, "Backfill batch processed";
|
||||
"batch_epoch" => epoch,
|
||||
"first_block_slot" => start_slot,
|
||||
@@ -377,7 +378,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
"processed_blobs" => n_blobs,
|
||||
"service"=> "sync");
|
||||
BatchProcessResult::Success {
|
||||
was_non_empty: sent_blocks > 0,
|
||||
sent_blocks,
|
||||
imported_blocks,
|
||||
}
|
||||
}
|
||||
(_, Err(e)) => {
|
||||
@@ -390,7 +392,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
"service" => "sync");
|
||||
match e.peer_action {
|
||||
Some(penalty) => BatchProcessResult::FaultyFailure {
|
||||
imported_blocks: false,
|
||||
imported_blocks: 0,
|
||||
penalty,
|
||||
},
|
||||
None => BatchProcessResult::NonFaultyFailure,
|
||||
|
||||
@@ -528,7 +528,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
// result callback. This is done, because an empty batch could end a chain and the logic
|
||||
// for removing chains and checking completion is in the callback.
|
||||
|
||||
let blocks = match batch.start_processing() {
|
||||
let (blocks, _) = match batch.start_processing() {
|
||||
Err(e) => {
|
||||
return self
|
||||
.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))
|
||||
@@ -615,13 +615,15 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer));
|
||||
|
||||
match result {
|
||||
BatchProcessResult::Success { was_non_empty } => {
|
||||
BatchProcessResult::Success {
|
||||
imported_blocks, ..
|
||||
} => {
|
||||
if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) {
|
||||
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
|
||||
}
|
||||
// If the processed batch was not empty, we can validate previous unvalidated
|
||||
// blocks.
|
||||
if *was_non_empty {
|
||||
if *imported_blocks > 0 {
|
||||
self.advance_chain(network, batch_id);
|
||||
}
|
||||
|
||||
@@ -677,7 +679,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
|
||||
Ok(BatchOperationOutcome::Continue) => {
|
||||
// chain can continue. Check if it can be progressed
|
||||
if *imported_blocks {
|
||||
if *imported_blocks > 0 {
|
||||
// At least one block was successfully verified and imported, then we can be sure all
|
||||
// previous batches are valid and we only need to download the current failed
|
||||
// batch.
|
||||
|
||||
@@ -156,11 +156,12 @@ pub enum BlockProcessingResult<E: EthSpec> {
|
||||
pub enum BatchProcessResult {
|
||||
/// The batch was completed successfully. It carries whether the sent batch contained blocks.
|
||||
Success {
|
||||
was_non_empty: bool,
|
||||
sent_blocks: usize,
|
||||
imported_blocks: usize,
|
||||
},
|
||||
/// The batch processing failed. It carries whether the processing imported any block.
|
||||
FaultyFailure {
|
||||
imported_blocks: bool,
|
||||
imported_blocks: usize,
|
||||
penalty: PeerAction,
|
||||
},
|
||||
NonFaultyFailure,
|
||||
|
||||
@@ -5,6 +5,7 @@ use lighthouse_network::PeerId;
|
||||
use std::collections::HashSet;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::ops::Sub;
|
||||
use std::time::{Duration, Instant};
|
||||
use strum::Display;
|
||||
use types::{Epoch, EthSpec, Slot};
|
||||
|
||||
@@ -118,7 +119,7 @@ pub enum BatchState<E: EthSpec> {
|
||||
/// The batch is being downloaded.
|
||||
Downloading(PeerId, Id),
|
||||
/// The batch has been completely downloaded and is ready for processing.
|
||||
AwaitingProcessing(PeerId, Vec<RpcBlock<E>>),
|
||||
AwaitingProcessing(PeerId, Vec<RpcBlock<E>>, Instant),
|
||||
/// The batch is being processed.
|
||||
Processing(Attempt),
|
||||
/// The batch was successfully processed and is waiting to be validated.
|
||||
@@ -210,13 +211,26 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
|
||||
match &self.state {
|
||||
BatchState::AwaitingDownload | BatchState::Failed => None,
|
||||
BatchState::Downloading(peer_id, _)
|
||||
| BatchState::AwaitingProcessing(peer_id, _)
|
||||
| BatchState::AwaitingProcessing(peer_id, _, _)
|
||||
| BatchState::Processing(Attempt { peer_id, .. })
|
||||
| BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id),
|
||||
BatchState::Poisoned => unreachable!("Poisoned batch"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the count of stored pending blocks if in awaiting processing state
|
||||
pub fn pending_blocks(&self) -> usize {
|
||||
match &self.state {
|
||||
BatchState::AwaitingProcessing(_, blocks, _) => blocks.len(),
|
||||
BatchState::AwaitingDownload
|
||||
| BatchState::Downloading { .. }
|
||||
| BatchState::Processing { .. }
|
||||
| BatchState::AwaitingValidation { .. }
|
||||
| BatchState::Poisoned
|
||||
| BatchState::Failed => 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a BlocksByRange request associated with the batch.
|
||||
pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) {
|
||||
(
|
||||
@@ -293,7 +307,7 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
|
||||
}
|
||||
|
||||
let received = blocks.len();
|
||||
self.state = BatchState::AwaitingProcessing(peer, blocks);
|
||||
self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now());
|
||||
Ok(received)
|
||||
}
|
||||
BatchState::Poisoned => unreachable!("Poisoned batch"),
|
||||
@@ -365,11 +379,11 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_processing(&mut self) -> Result<Vec<RpcBlock<E>>, WrongState> {
|
||||
pub fn start_processing(&mut self) -> Result<(Vec<RpcBlock<E>>, Duration), WrongState> {
|
||||
match self.state.poison() {
|
||||
BatchState::AwaitingProcessing(peer, blocks) => {
|
||||
BatchState::AwaitingProcessing(peer, blocks, start_instant) => {
|
||||
self.state = BatchState::Processing(Attempt::new::<B, E>(peer, &blocks));
|
||||
Ok(blocks)
|
||||
Ok((blocks, start_instant.elapsed()))
|
||||
}
|
||||
BatchState::Poisoned => unreachable!("Poisoned batch"),
|
||||
other => {
|
||||
@@ -515,7 +529,7 @@ impl<E: EthSpec> std::fmt::Debug for BatchState<E> {
|
||||
}) => write!(f, "AwaitingValidation({})", peer_id),
|
||||
BatchState::AwaitingDownload => f.write_str("AwaitingDownload"),
|
||||
BatchState::Failed => f.write_str("Failed"),
|
||||
BatchState::AwaitingProcessing(ref peer, ref blocks) => {
|
||||
BatchState::AwaitingProcessing(ref peer, ref blocks, _) => {
|
||||
write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len())
|
||||
}
|
||||
BatchState::Downloading(peer, request_id) => {
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
|
||||
use super::RangeSyncType;
|
||||
use crate::metrics;
|
||||
use crate::network_beacon_processor::ChainSegmentProcessId;
|
||||
use crate::sync::network_context::RangeRequestId;
|
||||
use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult};
|
||||
@@ -11,6 +13,7 @@ use rand::{seq::SliceRandom, Rng};
|
||||
use slog::{crit, debug, o, warn};
|
||||
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
|
||||
use std::hash::{Hash, Hasher};
|
||||
use strum::IntoStaticStr;
|
||||
use types::{Epoch, EthSpec, Hash256, Slot};
|
||||
|
||||
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
|
||||
@@ -53,6 +56,13 @@ pub struct KeepChain;
|
||||
pub type ChainId = u64;
|
||||
pub type BatchId = Epoch;
|
||||
|
||||
#[derive(Debug, Copy, Clone, IntoStaticStr)]
|
||||
pub enum SyncingChainType {
|
||||
Head,
|
||||
Finalized,
|
||||
Backfill,
|
||||
}
|
||||
|
||||
/// A chain of blocks that need to be downloaded. Peers who claim to contain the target head
|
||||
/// root are grouped into the peer pool and queried for batches when downloading the
|
||||
/// chain.
|
||||
@@ -60,6 +70,9 @@ pub struct SyncingChain<T: BeaconChainTypes> {
|
||||
/// A random id used to identify this chain.
|
||||
id: ChainId,
|
||||
|
||||
/// SyncingChain type
|
||||
pub chain_type: SyncingChainType,
|
||||
|
||||
/// The start of the chain segment. Any epoch previous to this one has been validated.
|
||||
pub start_epoch: Epoch,
|
||||
|
||||
@@ -126,6 +139,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
target_head_slot: Slot,
|
||||
target_head_root: Hash256,
|
||||
peer_id: PeerId,
|
||||
chain_type: SyncingChainType,
|
||||
log: &slog::Logger,
|
||||
) -> Self {
|
||||
let mut peers = FnvHashMap::default();
|
||||
@@ -135,6 +149,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
|
||||
SyncingChain {
|
||||
id,
|
||||
chain_type,
|
||||
start_epoch,
|
||||
target_head_slot,
|
||||
target_head_root,
|
||||
@@ -171,6 +186,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
self.validated_batches * EPOCHS_PER_BATCH
|
||||
}
|
||||
|
||||
/// Returns the total count of pending blocks in all the batches of this chain
|
||||
pub fn pending_blocks(&self) -> usize {
|
||||
self.batches
|
||||
.values()
|
||||
.map(|batch| batch.pending_blocks())
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Removes a peer from the chain.
|
||||
/// If the peer has active batches, those are considered failed and re-requested.
|
||||
pub fn remove_peer(
|
||||
@@ -305,7 +328,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
// result callback. This is done, because an empty batch could end a chain and the logic
|
||||
// for removing chains and checking completion is in the callback.
|
||||
|
||||
let blocks = batch.start_processing()?;
|
||||
let (blocks, duration_in_awaiting_processing) = batch.start_processing()?;
|
||||
metrics::observe_duration(
|
||||
&metrics::SYNCING_CHAIN_BATCH_AWAITING_PROCESSING,
|
||||
duration_in_awaiting_processing,
|
||||
);
|
||||
|
||||
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id);
|
||||
self.current_processing_batch = Some(batch_id);
|
||||
|
||||
@@ -469,10 +497,27 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
// We consider three cases. Batch was successfully processed, Batch failed processing due
|
||||
// to a faulty peer, or batch failed processing but the peer can't be deemed faulty.
|
||||
match result {
|
||||
BatchProcessResult::Success { was_non_empty } => {
|
||||
BatchProcessResult::Success {
|
||||
sent_blocks,
|
||||
imported_blocks,
|
||||
} => {
|
||||
if sent_blocks > imported_blocks {
|
||||
let ignored_blocks = sent_blocks - imported_blocks;
|
||||
metrics::inc_counter_vec_by(
|
||||
&metrics::SYNCING_CHAINS_IGNORED_BLOCKS,
|
||||
&[self.chain_type.into()],
|
||||
ignored_blocks as u64,
|
||||
);
|
||||
}
|
||||
metrics::inc_counter_vec(
|
||||
&metrics::SYNCING_CHAINS_PROCESSED_BATCHES,
|
||||
&[self.chain_type.into()],
|
||||
);
|
||||
|
||||
batch.processing_completed(BatchProcessingResult::Success)?;
|
||||
|
||||
if *was_non_empty {
|
||||
// was not empty = sent_blocks > 0
|
||||
if *sent_blocks > 0 {
|
||||
// If the processed batch was not empty, we can validate previous unvalidated
|
||||
// blocks.
|
||||
self.advance_chain(network, batch_id);
|
||||
@@ -515,7 +560,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
match batch.processing_completed(BatchProcessingResult::FaultyFailure)? {
|
||||
BatchOperationOutcome::Continue => {
|
||||
// Chain can continue. Check if it can be moved forward.
|
||||
if *imported_blocks {
|
||||
if *imported_blocks > 0 {
|
||||
// At least one block was successfully verified and imported, so we can be sure all
|
||||
// previous batches are valid and we only need to download the current failed
|
||||
// batch.
|
||||
@@ -1142,3 +1187,12 @@ impl RemoveChain {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RangeSyncType> for SyncingChainType {
|
||||
fn from(value: RangeSyncType) -> Self {
|
||||
match value {
|
||||
RangeSyncType::Head => Self::Head,
|
||||
RangeSyncType::Finalized => Self::Finalized,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,8 +64,8 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
|
||||
|
||||
/// Updates the Syncing state of the collection after a chain is removed.
|
||||
fn on_chain_removed(&mut self, id: &ChainId, was_syncing: bool, sync_type: RangeSyncType) {
|
||||
let _ = metrics::get_int_gauge(&metrics::SYNCING_CHAINS_COUNT, &[sync_type.as_str()])
|
||||
.map(|m| m.dec());
|
||||
metrics::inc_counter_vec(&metrics::SYNCING_CHAINS_REMOVED, &[sync_type.as_str()]);
|
||||
self.update_metrics();
|
||||
|
||||
match self.state {
|
||||
RangeSyncState::Finalized(ref syncing_id) => {
|
||||
@@ -493,15 +493,28 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
|
||||
target_head_slot,
|
||||
target_head_root,
|
||||
peer,
|
||||
sync_type.into(),
|
||||
&self.log,
|
||||
);
|
||||
debug_assert_eq!(new_chain.get_id(), id);
|
||||
debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain);
|
||||
entry.insert(new_chain);
|
||||
let _ =
|
||||
metrics::get_int_gauge(&metrics::SYNCING_CHAINS_COUNT, &[sync_type.as_str()])
|
||||
.map(|m| m.inc());
|
||||
metrics::inc_counter_vec(&metrics::SYNCING_CHAINS_ADDED, &[sync_type.as_str()]);
|
||||
self.update_metrics();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn update_metrics(&self) {
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::SYNCING_CHAINS_COUNT,
|
||||
&[RangeSyncType::Finalized.as_str()],
|
||||
self.finalized_chains.len() as i64,
|
||||
);
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::SYNCING_CHAINS_COUNT,
|
||||
&[RangeSyncType::Head.as_str()],
|
||||
self.head_chains.len() as i64,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@ use super::block_storage::BlockStorage;
|
||||
use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain};
|
||||
use super::chain_collection::ChainCollection;
|
||||
use super::sync_type::RangeSyncType;
|
||||
use crate::metrics;
|
||||
use crate::status::ToStatusMessage;
|
||||
use crate::sync::network_context::SyncNetworkContext;
|
||||
use crate::sync::BatchProcessResult;
|
||||
@@ -346,6 +347,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
metrics::inc_counter_vec_by(
|
||||
&metrics::SYNCING_CHAINS_DROPPED_BLOCKS,
|
||||
&[sync_type.as_str()],
|
||||
chain.pending_blocks() as u64,
|
||||
);
|
||||
|
||||
network.status_peers(self.beacon_chain.as_ref(), chain.peers());
|
||||
|
||||
let status = self.beacon_chain.status_message();
|
||||
|
||||
@@ -372,17 +372,22 @@ pub fn cli_app() -> Command {
|
||||
.arg(
|
||||
Arg::new("self-limiter")
|
||||
.long("self-limiter")
|
||||
.help(
|
||||
"Enables the outbound rate limiter (requests made by this node). \
|
||||
Use the self-limiter-protocol flag to set per protocol configurations. \
|
||||
If the self rate limiter is enabled and a protocol is not \
|
||||
present in the configuration, the quotas used for the inbound rate limiter will be \
|
||||
used."
|
||||
)
|
||||
.help("This flag is deprecated and has no effect.")
|
||||
.hide(true)
|
||||
.action(ArgAction::SetTrue)
|
||||
.help_heading(FLAG_HEADER)
|
||||
.display_order(0)
|
||||
)
|
||||
.arg(
|
||||
Arg::new("disable-self-limiter")
|
||||
.long("disable-self-limiter")
|
||||
.help(
|
||||
"Disables the outbound rate limiter (requests sent by this node)."
|
||||
)
|
||||
.action(ArgAction::SetTrue)
|
||||
.help_heading(FLAG_HEADER)
|
||||
.display_order(0)
|
||||
)
|
||||
.arg(
|
||||
Arg::new("self-limiter-protocols")
|
||||
.long("self-limiter-protocols")
|
||||
@@ -397,7 +402,7 @@ pub fn cli_app() -> Command {
|
||||
)
|
||||
.action(ArgAction::Append)
|
||||
.value_delimiter(';')
|
||||
.requires("self-limiter")
|
||||
.conflicts_with("disable-self-limiter")
|
||||
.display_order(0)
|
||||
)
|
||||
.arg(
|
||||
|
||||
@@ -1416,16 +1416,15 @@ pub fn set_network_config(
|
||||
// Light client server config.
|
||||
config.enable_light_client_server = parse_flag(cli_args, "light-client-server");
|
||||
|
||||
// The self limiter is disabled by default. If the `self-limiter` flag is provided
|
||||
// without the `self-limiter-protocols` flag, the default params will be used.
|
||||
if parse_flag(cli_args, "self-limiter") {
|
||||
config.outbound_rate_limiter_config =
|
||||
if let Some(protocols) = cli_args.get_one::<String>("self-limiter-protocols") {
|
||||
Some(protocols.parse()?)
|
||||
} else {
|
||||
Some(Default::default())
|
||||
};
|
||||
}
|
||||
// The self limiter is enabled by default. If the `self-limiter-protocols` flag is not provided,
|
||||
// the default params will be used.
|
||||
config.outbound_rate_limiter_config = if parse_flag(cli_args, "disable-self-limiter") {
|
||||
None
|
||||
} else if let Some(protocols) = cli_args.get_one::<String>("self-limiter-protocols") {
|
||||
Some(protocols.parse()?)
|
||||
} else {
|
||||
Some(Default::default())
|
||||
};
|
||||
|
||||
// Proposer-only mode overrides a number of previous configuration parameters.
|
||||
// Specifically, we avoid subscribing to long-lived subnets and wish to maintain a minimal set
|
||||
|
||||
@@ -505,6 +505,8 @@ Flags:
|
||||
--disable-quic
|
||||
Disables the quic transport. The node will rely solely on the TCP
|
||||
transport for libp2p connections.
|
||||
--disable-self-limiter
|
||||
Disables the outbound rate limiter (requests sent by this node).
|
||||
--disable-upnp
|
||||
Disables UPnP support. Setting this will prevent Lighthouse from
|
||||
attempting to automatically establish external port mappings.
|
||||
@@ -575,12 +577,6 @@ Flags:
|
||||
When present, Lighthouse will forget the payload statuses of any
|
||||
already-imported blocks. This can assist in the recovery from a
|
||||
consensus failure caused by the execution layer.
|
||||
--self-limiter
|
||||
Enables the outbound rate limiter (requests made by this node). Use
|
||||
the self-limiter-protocol flag to set per protocol configurations. If
|
||||
the self rate limiter is enabled and a protocol is not present in the
|
||||
configuration, the quotas used for the inbound rate limiter will be
|
||||
used.
|
||||
--shutdown-after-sync
|
||||
Shutdown beacon node as soon as sync is completed. Backfill sync will
|
||||
not be performed before shutdown.
|
||||
|
||||
@@ -346,6 +346,19 @@ impl BeaconNodeHttpClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform a HTTP POST request with a custom timeout and consensus header.
|
||||
async fn post_with_timeout_and_consensus_header<T: Serialize, U: IntoUrl>(
|
||||
&self,
|
||||
url: U,
|
||||
body: &T,
|
||||
timeout: Duration,
|
||||
fork_name: ForkName,
|
||||
) -> Result<(), Error> {
|
||||
self.post_generic_with_consensus_version(url, body, Some(timeout), fork_name)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform a HTTP POST request with a custom timeout, returning a JSON response.
|
||||
async fn post_with_timeout_and_response<T: DeserializeOwned, U: IntoUrl, V: Serialize>(
|
||||
&self,
|
||||
@@ -376,25 +389,6 @@ impl BeaconNodeHttpClient {
|
||||
ok_or_error(response).await
|
||||
}
|
||||
|
||||
/// Generic POST function supporting arbitrary responses and timeouts.
|
||||
/// Does not include Content-Type application/json in the request header.
|
||||
async fn post_generic_json_without_content_type_header<T: Serialize, U: IntoUrl>(
|
||||
&self,
|
||||
url: U,
|
||||
body: &T,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<Response, Error> {
|
||||
let mut builder = self.client.post(url);
|
||||
if let Some(timeout) = timeout {
|
||||
builder = builder.timeout(timeout);
|
||||
}
|
||||
|
||||
let serialized_body = serde_json::to_vec(body).map_err(Error::InvalidJson)?;
|
||||
|
||||
let response = builder.body(serialized_body).send().await?;
|
||||
ok_or_error(response).await
|
||||
}
|
||||
|
||||
/// Generic POST function supporting arbitrary responses and timeouts.
|
||||
async fn post_generic_with_consensus_version<T: Serialize, U: IntoUrl>(
|
||||
&self,
|
||||
@@ -1228,10 +1222,10 @@ impl BeaconNodeHttpClient {
|
||||
self.get_opt(path).await
|
||||
}
|
||||
|
||||
/// `GET beacon/blocks/{block_id}/attestations`
|
||||
/// `GET v1/beacon/blocks/{block_id}/attestations`
|
||||
///
|
||||
/// Returns `Ok(None)` on a 404 error.
|
||||
pub async fn get_beacon_blocks_attestations<E: EthSpec>(
|
||||
pub async fn get_beacon_blocks_attestations_v1<E: EthSpec>(
|
||||
&self,
|
||||
block_id: BlockId,
|
||||
) -> Result<Option<ExecutionOptimisticFinalizedResponse<Vec<Attestation<E>>>>, Error> {
|
||||
@@ -1247,8 +1241,28 @@ impl BeaconNodeHttpClient {
|
||||
self.get_opt(path).await
|
||||
}
|
||||
|
||||
/// `POST beacon/pool/attestations`
|
||||
pub async fn post_beacon_pool_attestations<E: EthSpec>(
|
||||
/// `GET v2/beacon/blocks/{block_id}/attestations`
|
||||
///
|
||||
/// Returns `Ok(None)` on a 404 error.
|
||||
pub async fn get_beacon_blocks_attestations_v2<E: EthSpec>(
|
||||
&self,
|
||||
block_id: BlockId,
|
||||
) -> Result<Option<ExecutionOptimisticFinalizedForkVersionedResponse<Vec<Attestation<E>>>>, Error>
|
||||
{
|
||||
let mut path = self.eth_path(V2)?;
|
||||
|
||||
path.path_segments_mut()
|
||||
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||
.push("beacon")
|
||||
.push("blocks")
|
||||
.push(&block_id.to_string())
|
||||
.push("attestations");
|
||||
|
||||
self.get_opt(path).await
|
||||
}
|
||||
|
||||
/// `POST v1/beacon/pool/attestations`
|
||||
pub async fn post_beacon_pool_attestations_v1<E: EthSpec>(
|
||||
&self,
|
||||
attestations: &[Attestation<E>],
|
||||
) -> Result<(), Error> {
|
||||
@@ -1266,8 +1280,33 @@ impl BeaconNodeHttpClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `GET beacon/pool/attestations?slot,committee_index`
|
||||
pub async fn get_beacon_pool_attestations<E: EthSpec>(
|
||||
/// `POST v2/beacon/pool/attestations`
|
||||
pub async fn post_beacon_pool_attestations_v2<E: EthSpec>(
|
||||
&self,
|
||||
attestations: &[Attestation<E>],
|
||||
fork_name: ForkName,
|
||||
) -> Result<(), Error> {
|
||||
let mut path = self.eth_path(V2)?;
|
||||
|
||||
path.path_segments_mut()
|
||||
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||
.push("beacon")
|
||||
.push("pool")
|
||||
.push("attestations");
|
||||
|
||||
self.post_with_timeout_and_consensus_header(
|
||||
path,
|
||||
&attestations,
|
||||
self.timeouts.attestation,
|
||||
fork_name,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `GET v1/beacon/pool/attestations?slot,committee_index`
|
||||
pub async fn get_beacon_pool_attestations_v1<E: EthSpec>(
|
||||
&self,
|
||||
slot: Option<Slot>,
|
||||
committee_index: Option<u64>,
|
||||
@@ -1293,8 +1332,35 @@ impl BeaconNodeHttpClient {
|
||||
self.get(path).await
|
||||
}
|
||||
|
||||
/// `POST beacon/pool/attester_slashings`
|
||||
pub async fn post_beacon_pool_attester_slashings<E: EthSpec>(
|
||||
/// `GET v2/beacon/pool/attestations?slot,committee_index`
|
||||
pub async fn get_beacon_pool_attestations_v2<E: EthSpec>(
|
||||
&self,
|
||||
slot: Option<Slot>,
|
||||
committee_index: Option<u64>,
|
||||
) -> Result<ForkVersionedResponse<Vec<Attestation<E>>>, Error> {
|
||||
let mut path = self.eth_path(V2)?;
|
||||
|
||||
path.path_segments_mut()
|
||||
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||
.push("beacon")
|
||||
.push("pool")
|
||||
.push("attestations");
|
||||
|
||||
if let Some(slot) = slot {
|
||||
path.query_pairs_mut()
|
||||
.append_pair("slot", &slot.to_string());
|
||||
}
|
||||
|
||||
if let Some(index) = committee_index {
|
||||
path.query_pairs_mut()
|
||||
.append_pair("committee_index", &index.to_string());
|
||||
}
|
||||
|
||||
self.get(path).await
|
||||
}
|
||||
|
||||
/// `POST v1/beacon/pool/attester_slashings`
|
||||
pub async fn post_beacon_pool_attester_slashings_v1<E: EthSpec>(
|
||||
&self,
|
||||
slashing: &AttesterSlashing<E>,
|
||||
) -> Result<(), Error> {
|
||||
@@ -1306,14 +1372,33 @@ impl BeaconNodeHttpClient {
|
||||
.push("pool")
|
||||
.push("attester_slashings");
|
||||
|
||||
self.post_generic_json_without_content_type_header(path, slashing, None)
|
||||
self.post_generic(path, slashing, None).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `POST v2/beacon/pool/attester_slashings`
|
||||
pub async fn post_beacon_pool_attester_slashings_v2<E: EthSpec>(
|
||||
&self,
|
||||
slashing: &AttesterSlashing<E>,
|
||||
fork_name: ForkName,
|
||||
) -> Result<(), Error> {
|
||||
let mut path = self.eth_path(V2)?;
|
||||
|
||||
path.path_segments_mut()
|
||||
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||
.push("beacon")
|
||||
.push("pool")
|
||||
.push("attester_slashings");
|
||||
|
||||
self.post_generic_with_consensus_version(path, slashing, None, fork_name)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `GET beacon/pool/attester_slashings`
|
||||
pub async fn get_beacon_pool_attester_slashings<E: EthSpec>(
|
||||
/// `GET v1/beacon/pool/attester_slashings`
|
||||
pub async fn get_beacon_pool_attester_slashings_v1<E: EthSpec>(
|
||||
&self,
|
||||
) -> Result<GenericResponse<Vec<AttesterSlashing<E>>>, Error> {
|
||||
let mut path = self.eth_path(V1)?;
|
||||
@@ -1327,6 +1412,21 @@ impl BeaconNodeHttpClient {
|
||||
self.get(path).await
|
||||
}
|
||||
|
||||
/// `GET v2/beacon/pool/attester_slashings`
|
||||
pub async fn get_beacon_pool_attester_slashings_v2<E: EthSpec>(
|
||||
&self,
|
||||
) -> Result<ForkVersionedResponse<Vec<AttesterSlashing<E>>>, Error> {
|
||||
let mut path = self.eth_path(V2)?;
|
||||
|
||||
path.path_segments_mut()
|
||||
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||
.push("beacon")
|
||||
.push("pool")
|
||||
.push("attester_slashings");
|
||||
|
||||
self.get(path).await
|
||||
}
|
||||
|
||||
/// `POST beacon/pool/proposer_slashings`
|
||||
pub async fn post_beacon_pool_proposer_slashings(
|
||||
&self,
|
||||
@@ -2216,8 +2316,8 @@ impl BeaconNodeHttpClient {
|
||||
self.get_with_timeout(path, self.timeouts.attestation).await
|
||||
}
|
||||
|
||||
/// `GET validator/aggregate_attestation?slot,attestation_data_root`
|
||||
pub async fn get_validator_aggregate_attestation<E: EthSpec>(
|
||||
/// `GET v1/validator/aggregate_attestation?slot,attestation_data_root`
|
||||
pub async fn get_validator_aggregate_attestation_v1<E: EthSpec>(
|
||||
&self,
|
||||
slot: Slot,
|
||||
attestation_data_root: Hash256,
|
||||
@@ -2240,6 +2340,32 @@ impl BeaconNodeHttpClient {
|
||||
.await
|
||||
}
|
||||
|
||||
/// `GET v2/validator/aggregate_attestation?slot,attestation_data_root,committee_index`
|
||||
pub async fn get_validator_aggregate_attestation_v2<E: EthSpec>(
|
||||
&self,
|
||||
slot: Slot,
|
||||
attestation_data_root: Hash256,
|
||||
committee_index: CommitteeIndex,
|
||||
) -> Result<Option<ForkVersionedResponse<Attestation<E>>>, Error> {
|
||||
let mut path = self.eth_path(V2)?;
|
||||
|
||||
path.path_segments_mut()
|
||||
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||
.push("validator")
|
||||
.push("aggregate_attestation");
|
||||
|
||||
path.query_pairs_mut()
|
||||
.append_pair("slot", &slot.to_string())
|
||||
.append_pair(
|
||||
"attestation_data_root",
|
||||
&format!("{:?}", attestation_data_root),
|
||||
)
|
||||
.append_pair("committee_index", &committee_index.to_string());
|
||||
|
||||
self.get_opt_with_timeout(path, self.timeouts.attestation)
|
||||
.await
|
||||
}
|
||||
|
||||
/// `GET validator/sync_committee_contribution`
|
||||
pub async fn get_validator_sync_committee_contribution<E: EthSpec>(
|
||||
&self,
|
||||
@@ -2335,8 +2461,8 @@ impl BeaconNodeHttpClient {
|
||||
.await
|
||||
}
|
||||
|
||||
/// `POST validator/aggregate_and_proofs`
|
||||
pub async fn post_validator_aggregate_and_proof<E: EthSpec>(
|
||||
/// `POST v1/validator/aggregate_and_proofs`
|
||||
pub async fn post_validator_aggregate_and_proof_v1<E: EthSpec>(
|
||||
&self,
|
||||
aggregates: &[SignedAggregateAndProof<E>],
|
||||
) -> Result<(), Error> {
|
||||
@@ -2353,6 +2479,30 @@ impl BeaconNodeHttpClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `POST v2/validator/aggregate_and_proofs`
|
||||
pub async fn post_validator_aggregate_and_proof_v2<E: EthSpec>(
|
||||
&self,
|
||||
aggregates: &[SignedAggregateAndProof<E>],
|
||||
fork_name: ForkName,
|
||||
) -> Result<(), Error> {
|
||||
let mut path = self.eth_path(V2)?;
|
||||
|
||||
path.path_segments_mut()
|
||||
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||
.push("validator")
|
||||
.push("aggregate_and_proofs");
|
||||
|
||||
self.post_with_timeout_and_consensus_header(
|
||||
path,
|
||||
&aggregates,
|
||||
self.timeouts.attestation,
|
||||
fork_name,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `POST validator/beacon_committee_subscriptions`
|
||||
pub async fn post_validator_beacon_committee_subscriptions(
|
||||
&self,
|
||||
|
||||
@@ -780,6 +780,8 @@ pub struct ValidatorAttestationDataQuery {
|
||||
pub struct ValidatorAggregateAttestationQuery {
|
||||
pub attestation_data_root: Hash256,
|
||||
pub slot: Slot,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub committee_index: Option<CommitteeIndex>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::slot_data::SlotData;
|
||||
use crate::Checkpoint;
|
||||
use crate::{test_utils::TestRandom, Hash256, Slot};
|
||||
use crate::{Checkpoint, ForkVersionDeserialize};
|
||||
use derivative::Derivative;
|
||||
use safe_arith::ArithError;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -26,6 +26,12 @@ pub enum Error {
|
||||
InvalidCommitteeIndex,
|
||||
}
|
||||
|
||||
impl From<ssz_types::Error> for Error {
|
||||
fn from(e: ssz_types::Error) -> Self {
|
||||
Error::SszTypesError(e)
|
||||
}
|
||||
}
|
||||
|
||||
#[superstruct(
|
||||
variants(Base, Electra),
|
||||
variant_attributes(
|
||||
@@ -487,6 +493,46 @@ impl<'a, E: EthSpec> From<AttestationRefOnDisk<'a, E>> for AttestationRef<'a, E>
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> ForkVersionDeserialize for Attestation<E> {
|
||||
fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>(
|
||||
value: serde_json::Value,
|
||||
fork_name: crate::ForkName,
|
||||
) -> Result<Self, D::Error> {
|
||||
if fork_name.electra_enabled() {
|
||||
let attestation: AttestationElectra<E> =
|
||||
serde_json::from_value(value).map_err(serde::de::Error::custom)?;
|
||||
Ok(Attestation::Electra(attestation))
|
||||
} else {
|
||||
let attestation: AttestationBase<E> =
|
||||
serde_json::from_value(value).map_err(serde::de::Error::custom)?;
|
||||
Ok(Attestation::Base(attestation))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> ForkVersionDeserialize for Vec<Attestation<E>> {
|
||||
fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>(
|
||||
value: serde_json::Value,
|
||||
fork_name: crate::ForkName,
|
||||
) -> Result<Self, D::Error> {
|
||||
if fork_name.electra_enabled() {
|
||||
let attestations: Vec<AttestationElectra<E>> =
|
||||
serde_json::from_value(value).map_err(serde::de::Error::custom)?;
|
||||
Ok(attestations
|
||||
.into_iter()
|
||||
.map(Attestation::Electra)
|
||||
.collect::<Vec<_>>())
|
||||
} else {
|
||||
let attestations: Vec<AttestationBase<E>> =
|
||||
serde_json::from_value(value).map_err(serde::de::Error::custom)?;
|
||||
Ok(attestations
|
||||
.into_iter()
|
||||
.map(Attestation::Base)
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -171,6 +171,29 @@ impl<E: EthSpec> TestRandom for AttesterSlashing<E> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> crate::ForkVersionDeserialize for Vec<AttesterSlashing<E>> {
|
||||
fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>(
|
||||
value: serde_json::Value,
|
||||
fork_name: crate::ForkName,
|
||||
) -> Result<Self, D::Error> {
|
||||
if fork_name.electra_enabled() {
|
||||
let slashings: Vec<AttesterSlashingElectra<E>> =
|
||||
serde_json::from_value(value).map_err(serde::de::Error::custom)?;
|
||||
Ok(slashings
|
||||
.into_iter()
|
||||
.map(AttesterSlashing::Electra)
|
||||
.collect::<Vec<_>>())
|
||||
} else {
|
||||
let slashings: Vec<AttesterSlashingBase<E>> =
|
||||
serde_json::from_value(value).map_err(serde::de::Error::custom)?;
|
||||
Ok(slashings
|
||||
.into_iter()
|
||||
.map(AttesterSlashing::Base)
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -287,17 +287,21 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
// Then download, sign and publish a `SignedAggregateAndProof` for each
|
||||
// validator that is elected to aggregate for this `slot` and
|
||||
// `committee_index`.
|
||||
self.produce_and_publish_aggregates(&attestation_data, &validator_duties)
|
||||
.await
|
||||
.map_err(move |e| {
|
||||
crit!(
|
||||
log,
|
||||
"Error during attestation routine";
|
||||
"error" => format!("{:?}", e),
|
||||
"committee_index" => committee_index,
|
||||
"slot" => slot.as_u64(),
|
||||
)
|
||||
})?;
|
||||
self.produce_and_publish_aggregates(
|
||||
&attestation_data,
|
||||
committee_index,
|
||||
&validator_duties,
|
||||
)
|
||||
.await
|
||||
.map_err(move |e| {
|
||||
crit!(
|
||||
log,
|
||||
"Error during attestation routine";
|
||||
"error" => format!("{:?}", e),
|
||||
"committee_index" => committee_index,
|
||||
"slot" => slot.as_u64(),
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -445,6 +449,11 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
warn!(log, "No attestations were published");
|
||||
return Ok(None);
|
||||
}
|
||||
let fork_name = self
|
||||
.context
|
||||
.eth2_config
|
||||
.spec
|
||||
.fork_name_at_slot::<E>(attestation_data.slot);
|
||||
|
||||
// Post the attestations to the BN.
|
||||
match self
|
||||
@@ -458,9 +467,15 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
&metrics::ATTESTATION_SERVICE_TIMES,
|
||||
&[metrics::ATTESTATIONS_HTTP_POST],
|
||||
);
|
||||
beacon_node
|
||||
.post_beacon_pool_attestations(attestations)
|
||||
.await
|
||||
if fork_name.electra_enabled() {
|
||||
beacon_node
|
||||
.post_beacon_pool_attestations_v2(attestations, fork_name)
|
||||
.await
|
||||
} else {
|
||||
beacon_node
|
||||
.post_beacon_pool_attestations_v1(attestations)
|
||||
.await
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
@@ -504,6 +519,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
async fn produce_and_publish_aggregates(
|
||||
&self,
|
||||
attestation_data: &AttestationData,
|
||||
committee_index: CommitteeIndex,
|
||||
validator_duties: &[DutyAndProof],
|
||||
) -> Result<(), String> {
|
||||
let log = self.context.log();
|
||||
@@ -516,6 +532,12 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let fork_name = self
|
||||
.context
|
||||
.eth2_config
|
||||
.spec
|
||||
.fork_name_at_slot::<E>(attestation_data.slot);
|
||||
|
||||
let aggregated_attestation = &self
|
||||
.beacon_nodes
|
||||
.first_success(
|
||||
@@ -526,17 +548,36 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
&metrics::ATTESTATION_SERVICE_TIMES,
|
||||
&[metrics::AGGREGATES_HTTP_GET],
|
||||
);
|
||||
beacon_node
|
||||
.get_validator_aggregate_attestation(
|
||||
attestation_data.slot,
|
||||
attestation_data.tree_hash_root(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
format!("Failed to produce an aggregate attestation: {:?}", e)
|
||||
})?
|
||||
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))
|
||||
.map(|result| result.data)
|
||||
if fork_name.electra_enabled() {
|
||||
beacon_node
|
||||
.get_validator_aggregate_attestation_v2(
|
||||
attestation_data.slot,
|
||||
attestation_data.tree_hash_root(),
|
||||
committee_index,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
format!("Failed to produce an aggregate attestation: {:?}", e)
|
||||
})?
|
||||
.ok_or_else(|| {
|
||||
format!("No aggregate available for {:?}", attestation_data)
|
||||
})
|
||||
.map(|result| result.data)
|
||||
} else {
|
||||
beacon_node
|
||||
.get_validator_aggregate_attestation_v1(
|
||||
attestation_data.slot,
|
||||
attestation_data.tree_hash_root(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
format!("Failed to produce an aggregate attestation: {:?}", e)
|
||||
})?
|
||||
.ok_or_else(|| {
|
||||
format!("No aggregate available for {:?}", attestation_data)
|
||||
})
|
||||
.map(|result| result.data)
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
@@ -604,9 +645,20 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
&metrics::ATTESTATION_SERVICE_TIMES,
|
||||
&[metrics::AGGREGATES_HTTP_POST],
|
||||
);
|
||||
beacon_node
|
||||
.post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice)
|
||||
.await
|
||||
if fork_name.electra_enabled() {
|
||||
beacon_node
|
||||
.post_validator_aggregate_and_proof_v2(
|
||||
signed_aggregate_and_proofs_slice,
|
||||
fork_name,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
beacon_node
|
||||
.post_validator_aggregate_and_proof_v1(
|
||||
signed_aggregate_and_proofs_slice,
|
||||
)
|
||||
.await
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user