mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-07 00:42:42 +00:00
Move beacon pool http api to its own separate module (#8543)
Continuation of: * #8536 Moving `/beacon/pool` endpoints out of `http_api` to a separation module. This should improve code maintainability, incremental compilation time and rust analyzer performance. This is a tedious but straight forward change, so we're going with a pair & insta-merge approach to avoid painful & slow async review Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
@@ -1 +1,2 @@
|
|||||||
|
pub mod pool;
|
||||||
pub mod states;
|
pub mod states;
|
||||||
|
|||||||
522
beacon_node/http_api/src/beacon/pool.rs
Normal file
522
beacon_node/http_api/src/beacon/pool.rs
Normal file
@@ -0,0 +1,522 @@
|
|||||||
|
use crate::task_spawner::{Priority, TaskSpawner};
|
||||||
|
use crate::utils::{NetworkTxFilter, OptionalConsensusVersionHeaderFilter, ResponseFilter};
|
||||||
|
use crate::version::{
|
||||||
|
ResponseIncludesVersion, V1, V2, add_consensus_version_header, beacon_response,
|
||||||
|
unsupported_version_rejection,
|
||||||
|
};
|
||||||
|
use crate::{sync_committees, utils};
|
||||||
|
use beacon_chain::observed_operations::ObservationOutcome;
|
||||||
|
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||||
|
use eth2::types::{AttestationPoolQuery, EndpointVersion, Failure, GenericResponse};
|
||||||
|
use lighthouse_network::PubsubMessage;
|
||||||
|
use network::NetworkMessage;
|
||||||
|
use operation_pool::ReceivedPreCapella;
|
||||||
|
use slot_clock::SlotClock;
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
|
use tracing::{debug, info, warn};
|
||||||
|
use types::{
|
||||||
|
Attestation, AttestationData, AttesterSlashing, ForkName, ProposerSlashing,
|
||||||
|
SignedBlsToExecutionChange, SignedVoluntaryExit, SingleAttestation, SyncCommitteeMessage,
|
||||||
|
};
|
||||||
|
use warp::filters::BoxedFilter;
|
||||||
|
use warp::{Filter, Reply};
|
||||||
|
use warp_utils::reject::convert_rejection;
|
||||||
|
|
||||||
|
pub type BeaconPoolPathFilter<T> = BoxedFilter<(
|
||||||
|
TaskSpawner<<T as BeaconChainTypes>::EthSpec>,
|
||||||
|
Arc<BeaconChain<T>>,
|
||||||
|
)>;
|
||||||
|
pub type BeaconPoolPathV2Filter<T> = BoxedFilter<(
|
||||||
|
TaskSpawner<<T as BeaconChainTypes>::EthSpec>,
|
||||||
|
Arc<BeaconChain<T>>,
|
||||||
|
)>;
|
||||||
|
pub type BeaconPoolPathAnyFilter<T> = BoxedFilter<(
|
||||||
|
EndpointVersion,
|
||||||
|
TaskSpawner<<T as BeaconChainTypes>::EthSpec>,
|
||||||
|
Arc<BeaconChain<T>>,
|
||||||
|
)>;
|
||||||
|
|
||||||
|
/// POST beacon/pool/bls_to_execution_changes
|
||||||
|
pub fn post_beacon_pool_bls_to_execution_changes<T: BeaconChainTypes>(
|
||||||
|
network_tx_filter: &NetworkTxFilter<T>,
|
||||||
|
beacon_pool_path: &BeaconPoolPathFilter<T>,
|
||||||
|
) -> ResponseFilter {
|
||||||
|
beacon_pool_path
|
||||||
|
.clone()
|
||||||
|
.and(warp::path("bls_to_execution_changes"))
|
||||||
|
.and(warp::path::end())
|
||||||
|
.and(warp_utils::json::json())
|
||||||
|
.and(network_tx_filter.clone())
|
||||||
|
.then(
|
||||||
|
|task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
|
chain: Arc<BeaconChain<T>>,
|
||||||
|
address_changes: Vec<SignedBlsToExecutionChange>,
|
||||||
|
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
|
||||||
|
task_spawner.blocking_json_task(Priority::P0, move || {
|
||||||
|
let mut failures = vec![];
|
||||||
|
|
||||||
|
for (index, address_change) in address_changes.into_iter().enumerate() {
|
||||||
|
let validator_index = address_change.message.validator_index;
|
||||||
|
|
||||||
|
match chain.verify_bls_to_execution_change_for_http_api(address_change) {
|
||||||
|
Ok(ObservationOutcome::New(verified_address_change)) => {
|
||||||
|
let validator_index =
|
||||||
|
verified_address_change.as_inner().message.validator_index;
|
||||||
|
let address = verified_address_change
|
||||||
|
.as_inner()
|
||||||
|
.message
|
||||||
|
.to_execution_address;
|
||||||
|
|
||||||
|
// New to P2P *and* op pool, gossip immediately if post-Capella.
|
||||||
|
let received_pre_capella =
|
||||||
|
if chain.current_slot_is_post_capella().unwrap_or(false) {
|
||||||
|
ReceivedPreCapella::No
|
||||||
|
} else {
|
||||||
|
ReceivedPreCapella::Yes
|
||||||
|
};
|
||||||
|
if matches!(received_pre_capella, ReceivedPreCapella::No) {
|
||||||
|
utils::publish_pubsub_message(
|
||||||
|
&network_tx,
|
||||||
|
PubsubMessage::BlsToExecutionChange(Box::new(
|
||||||
|
verified_address_change.as_inner().clone(),
|
||||||
|
)),
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Import to op pool (may return `false` if there's a race).
|
||||||
|
let imported = chain.import_bls_to_execution_change(
|
||||||
|
verified_address_change,
|
||||||
|
received_pre_capella,
|
||||||
|
);
|
||||||
|
|
||||||
|
info!(
|
||||||
|
%validator_index,
|
||||||
|
?address,
|
||||||
|
published =
|
||||||
|
matches!(received_pre_capella, ReceivedPreCapella::No),
|
||||||
|
imported,
|
||||||
|
"Processed BLS to execution change"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(ObservationOutcome::AlreadyKnown) => {
|
||||||
|
debug!(%validator_index, "BLS to execution change already known");
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
validator_index,
|
||||||
|
reason = ?e,
|
||||||
|
source = "HTTP",
|
||||||
|
"Invalid BLS to execution change"
|
||||||
|
);
|
||||||
|
failures.push(Failure::new(index, format!("invalid: {e:?}")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if failures.is_empty() {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(warp_utils::reject::indexed_bad_request(
|
||||||
|
"some BLS to execution changes failed to verify".into(),
|
||||||
|
failures,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// GET beacon/pool/bls_to_execution_changes
|
||||||
|
pub fn get_beacon_pool_bls_to_execution_changes<T: BeaconChainTypes>(
|
||||||
|
beacon_pool_path: &BeaconPoolPathFilter<T>,
|
||||||
|
) -> ResponseFilter {
|
||||||
|
beacon_pool_path
|
||||||
|
.clone()
|
||||||
|
.and(warp::path("bls_to_execution_changes"))
|
||||||
|
.and(warp::path::end())
|
||||||
|
.then(
|
||||||
|
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
|
||||||
|
task_spawner.blocking_json_task(Priority::P1, move || {
|
||||||
|
let address_changes = chain.op_pool.get_all_bls_to_execution_changes();
|
||||||
|
Ok(GenericResponse::from(address_changes))
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// POST beacon/pool/sync_committees
|
||||||
|
pub fn post_beacon_pool_sync_committees<T: BeaconChainTypes>(
|
||||||
|
network_tx_filter: &NetworkTxFilter<T>,
|
||||||
|
beacon_pool_path: &BeaconPoolPathFilter<T>,
|
||||||
|
) -> ResponseFilter {
|
||||||
|
beacon_pool_path
|
||||||
|
.clone()
|
||||||
|
.and(warp::path("sync_committees"))
|
||||||
|
.and(warp::path::end())
|
||||||
|
.and(warp_utils::json::json())
|
||||||
|
.and(network_tx_filter.clone())
|
||||||
|
.then(
|
||||||
|
|task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
|
chain: Arc<BeaconChain<T>>,
|
||||||
|
signatures: Vec<SyncCommitteeMessage>,
|
||||||
|
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
|
||||||
|
task_spawner.blocking_json_task(Priority::P0, move || {
|
||||||
|
sync_committees::process_sync_committee_signatures(
|
||||||
|
signatures, network_tx, &chain,
|
||||||
|
)?;
|
||||||
|
Ok(GenericResponse::from(()))
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// GET beacon/pool/voluntary_exits
|
||||||
|
pub fn get_beacon_pool_voluntary_exits<T: BeaconChainTypes>(
|
||||||
|
beacon_pool_path: &BeaconPoolPathFilter<T>,
|
||||||
|
) -> ResponseFilter {
|
||||||
|
beacon_pool_path
|
||||||
|
.clone()
|
||||||
|
.and(warp::path("voluntary_exits"))
|
||||||
|
.and(warp::path::end())
|
||||||
|
.then(
|
||||||
|
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
|
||||||
|
task_spawner.blocking_json_task(Priority::P1, move || {
|
||||||
|
let attestations = chain.op_pool.get_all_voluntary_exits();
|
||||||
|
Ok(GenericResponse::from(attestations))
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// POST beacon/pool/voluntary_exits
|
||||||
|
pub fn post_beacon_pool_voluntary_exits<T: BeaconChainTypes>(
|
||||||
|
network_tx_filter: &NetworkTxFilter<T>,
|
||||||
|
beacon_pool_path: &BeaconPoolPathFilter<T>,
|
||||||
|
) -> ResponseFilter {
|
||||||
|
beacon_pool_path
|
||||||
|
.clone()
|
||||||
|
.and(warp::path("voluntary_exits"))
|
||||||
|
.and(warp::path::end())
|
||||||
|
.and(warp_utils::json::json())
|
||||||
|
.and(network_tx_filter.clone())
|
||||||
|
.then(
|
||||||
|
|task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
|
chain: Arc<BeaconChain<T>>,
|
||||||
|
exit: SignedVoluntaryExit,
|
||||||
|
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
|
||||||
|
task_spawner.blocking_json_task(Priority::P0, move || {
|
||||||
|
let outcome = chain
|
||||||
|
.verify_voluntary_exit_for_gossip(exit.clone())
|
||||||
|
.map_err(|e| {
|
||||||
|
warp_utils::reject::object_invalid(format!(
|
||||||
|
"gossip verification failed: {:?}",
|
||||||
|
e
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Notify the validator monitor.
|
||||||
|
chain
|
||||||
|
.validator_monitor
|
||||||
|
.read()
|
||||||
|
.register_api_voluntary_exit(&exit.message);
|
||||||
|
|
||||||
|
if let ObservationOutcome::New(exit) = outcome {
|
||||||
|
utils::publish_pubsub_message(
|
||||||
|
&network_tx,
|
||||||
|
PubsubMessage::VoluntaryExit(Box::new(exit.clone().into_inner())),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
chain.import_voluntary_exit(exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// GET beacon/pool/proposer_slashings
|
||||||
|
pub fn get_beacon_pool_proposer_slashings<T: BeaconChainTypes>(
|
||||||
|
beacon_pool_path: &BeaconPoolPathFilter<T>,
|
||||||
|
) -> ResponseFilter {
|
||||||
|
beacon_pool_path
|
||||||
|
.clone()
|
||||||
|
.and(warp::path("proposer_slashings"))
|
||||||
|
.and(warp::path::end())
|
||||||
|
.then(
|
||||||
|
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
|
||||||
|
task_spawner.blocking_json_task(Priority::P1, move || {
|
||||||
|
let attestations = chain.op_pool.get_all_proposer_slashings();
|
||||||
|
Ok(GenericResponse::from(attestations))
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// POST beacon/pool/proposer_slashings
|
||||||
|
pub fn post_beacon_pool_proposer_slashings<T: BeaconChainTypes>(
|
||||||
|
network_tx_filter: &NetworkTxFilter<T>,
|
||||||
|
beacon_pool_path: &BeaconPoolPathFilter<T>,
|
||||||
|
) -> ResponseFilter {
|
||||||
|
beacon_pool_path
|
||||||
|
.clone()
|
||||||
|
.and(warp::path("proposer_slashings"))
|
||||||
|
.and(warp::path::end())
|
||||||
|
.and(warp_utils::json::json())
|
||||||
|
.and(network_tx_filter.clone())
|
||||||
|
.then(
|
||||||
|
|task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
|
chain: Arc<BeaconChain<T>>,
|
||||||
|
slashing: ProposerSlashing,
|
||||||
|
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
|
||||||
|
task_spawner.blocking_json_task(Priority::P0, move || {
|
||||||
|
let outcome = chain
|
||||||
|
.verify_proposer_slashing_for_gossip(slashing.clone())
|
||||||
|
.map_err(|e| {
|
||||||
|
warp_utils::reject::object_invalid(format!(
|
||||||
|
"gossip verification failed: {:?}",
|
||||||
|
e
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Notify the validator monitor.
|
||||||
|
chain
|
||||||
|
.validator_monitor
|
||||||
|
.read()
|
||||||
|
.register_api_proposer_slashing(&slashing);
|
||||||
|
|
||||||
|
if let ObservationOutcome::New(slashing) = outcome {
|
||||||
|
utils::publish_pubsub_message(
|
||||||
|
&network_tx,
|
||||||
|
PubsubMessage::ProposerSlashing(Box::new(
|
||||||
|
slashing.clone().into_inner(),
|
||||||
|
)),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
chain.import_proposer_slashing(slashing);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// GET beacon/pool/attester_slashings
|
||||||
|
pub fn get_beacon_pool_attester_slashings<T: BeaconChainTypes>(
|
||||||
|
beacon_pool_path_any: &BeaconPoolPathAnyFilter<T>,
|
||||||
|
) -> ResponseFilter {
|
||||||
|
beacon_pool_path_any
|
||||||
|
.clone()
|
||||||
|
.and(warp::path("attester_slashings"))
|
||||||
|
.and(warp::path::end())
|
||||||
|
.then(
|
||||||
|
|endpoint_version: EndpointVersion,
|
||||||
|
task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
|
chain: Arc<BeaconChain<T>>| {
|
||||||
|
task_spawner.blocking_response_task(Priority::P1, move || {
|
||||||
|
let slashings = chain.op_pool.get_all_attester_slashings();
|
||||||
|
|
||||||
|
// Use the current slot to find the fork version, and convert all messages to the
|
||||||
|
// current fork's format. This is to ensure consistent message types matching
|
||||||
|
// `Eth-Consensus-Version`.
|
||||||
|
let current_slot =
|
||||||
|
chain
|
||||||
|
.slot_clock
|
||||||
|
.now()
|
||||||
|
.ok_or(warp_utils::reject::custom_server_error(
|
||||||
|
"unable to read slot clock".to_string(),
|
||||||
|
))?;
|
||||||
|
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(current_slot);
|
||||||
|
let slashings = slashings
|
||||||
|
.into_iter()
|
||||||
|
.filter(|slashing| {
|
||||||
|
(fork_name.electra_enabled()
|
||||||
|
&& matches!(slashing, AttesterSlashing::Electra(_)))
|
||||||
|
|| (!fork_name.electra_enabled()
|
||||||
|
&& matches!(slashing, AttesterSlashing::Base(_)))
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let require_version = match endpoint_version {
|
||||||
|
V1 => ResponseIncludesVersion::No,
|
||||||
|
V2 => ResponseIncludesVersion::Yes(fork_name),
|
||||||
|
_ => return Err(unsupported_version_rejection(endpoint_version)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let res = beacon_response(require_version, &slashings);
|
||||||
|
Ok(add_consensus_version_header(
|
||||||
|
warp::reply::json(&res).into_response(),
|
||||||
|
fork_name,
|
||||||
|
))
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
// POST beacon/pool/attester_slashings
|
||||||
|
pub fn post_beacon_pool_attester_slashings<T: BeaconChainTypes>(
|
||||||
|
network_tx_filter: &NetworkTxFilter<T>,
|
||||||
|
beacon_pool_path_any: &BeaconPoolPathAnyFilter<T>,
|
||||||
|
) -> ResponseFilter {
|
||||||
|
beacon_pool_path_any
|
||||||
|
.clone()
|
||||||
|
.and(warp::path("attester_slashings"))
|
||||||
|
.and(warp::path::end())
|
||||||
|
.and(warp_utils::json::json())
|
||||||
|
.and(network_tx_filter.clone())
|
||||||
|
.then(
|
||||||
|
// V1 and V2 are identical except V2 has a consensus version header in the request.
|
||||||
|
// We only require this header for SSZ deserialization, which isn't supported for
|
||||||
|
// this endpoint presently.
|
||||||
|
|_endpoint_version: EndpointVersion,
|
||||||
|
task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
|
chain: Arc<BeaconChain<T>>,
|
||||||
|
slashing: AttesterSlashing<T::EthSpec>,
|
||||||
|
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
|
||||||
|
task_spawner.blocking_json_task(Priority::P0, move || {
|
||||||
|
let outcome = chain
|
||||||
|
.verify_attester_slashing_for_gossip(slashing.clone())
|
||||||
|
.map_err(|e| {
|
||||||
|
warp_utils::reject::object_invalid(format!(
|
||||||
|
"gossip verification failed: {:?}",
|
||||||
|
e
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Notify the validator monitor.
|
||||||
|
chain
|
||||||
|
.validator_monitor
|
||||||
|
.read()
|
||||||
|
.register_api_attester_slashing(slashing.to_ref());
|
||||||
|
|
||||||
|
if let ObservationOutcome::New(slashing) = outcome {
|
||||||
|
utils::publish_pubsub_message(
|
||||||
|
&network_tx,
|
||||||
|
PubsubMessage::AttesterSlashing(Box::new(
|
||||||
|
slashing.clone().into_inner(),
|
||||||
|
)),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
chain.import_attester_slashing(slashing);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// GET beacon/pool/attestations?committee_index,slot
|
||||||
|
pub fn get_beacon_pool_attestations<T: BeaconChainTypes>(
|
||||||
|
beacon_pool_path_any: &BeaconPoolPathAnyFilter<T>,
|
||||||
|
) -> ResponseFilter {
|
||||||
|
beacon_pool_path_any
|
||||||
|
.clone()
|
||||||
|
.and(warp::path("attestations"))
|
||||||
|
.and(warp::path::end())
|
||||||
|
.and(warp::query::<AttestationPoolQuery>())
|
||||||
|
.then(
|
||||||
|
|endpoint_version: EndpointVersion,
|
||||||
|
task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
|
chain: Arc<BeaconChain<T>>,
|
||||||
|
query: AttestationPoolQuery| {
|
||||||
|
task_spawner.blocking_response_task(Priority::P1, move || {
|
||||||
|
let query_filter = |data: &AttestationData, committee_indices: HashSet<u64>| {
|
||||||
|
query.slot.is_none_or(|slot| slot == data.slot)
|
||||||
|
&& query
|
||||||
|
.committee_index
|
||||||
|
.is_none_or(|index| committee_indices.contains(&index))
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut attestations = chain.op_pool.get_filtered_attestations(query_filter);
|
||||||
|
attestations.extend(
|
||||||
|
chain
|
||||||
|
.naive_aggregation_pool
|
||||||
|
.read()
|
||||||
|
.iter()
|
||||||
|
.filter(|&att| {
|
||||||
|
query_filter(att.data(), att.get_committee_indices_map())
|
||||||
|
})
|
||||||
|
.cloned(),
|
||||||
|
);
|
||||||
|
// Use the current slot to find the fork version, and convert all messages to the
|
||||||
|
// current fork's format. This is to ensure consistent message types matching
|
||||||
|
// `Eth-Consensus-Version`.
|
||||||
|
let current_slot =
|
||||||
|
chain
|
||||||
|
.slot_clock
|
||||||
|
.now()
|
||||||
|
.ok_or(warp_utils::reject::custom_server_error(
|
||||||
|
"unable to read slot clock".to_string(),
|
||||||
|
))?;
|
||||||
|
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(current_slot);
|
||||||
|
let attestations = attestations
|
||||||
|
.into_iter()
|
||||||
|
.filter(|att| {
|
||||||
|
(fork_name.electra_enabled() && matches!(att, Attestation::Electra(_)))
|
||||||
|
|| (!fork_name.electra_enabled()
|
||||||
|
&& matches!(att, Attestation::Base(_)))
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let require_version = match endpoint_version {
|
||||||
|
V1 => ResponseIncludesVersion::No,
|
||||||
|
V2 => ResponseIncludesVersion::Yes(fork_name),
|
||||||
|
_ => return Err(unsupported_version_rejection(endpoint_version)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let res = beacon_response(require_version, &attestations);
|
||||||
|
Ok(add_consensus_version_header(
|
||||||
|
warp::reply::json(&res).into_response(),
|
||||||
|
fork_name,
|
||||||
|
))
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn post_beacon_pool_attestations_v2<T: BeaconChainTypes>(
|
||||||
|
network_tx_filter: &NetworkTxFilter<T>,
|
||||||
|
optional_consensus_version_header_filter: OptionalConsensusVersionHeaderFilter,
|
||||||
|
beacon_pool_path_v2: &BeaconPoolPathV2Filter<T>,
|
||||||
|
) -> ResponseFilter {
|
||||||
|
beacon_pool_path_v2
|
||||||
|
.clone()
|
||||||
|
.and(warp::path("attestations"))
|
||||||
|
.and(warp::path::end())
|
||||||
|
.and(warp_utils::json::json::<Vec<SingleAttestation>>())
|
||||||
|
.and(optional_consensus_version_header_filter)
|
||||||
|
.and(network_tx_filter.clone())
|
||||||
|
.then(
|
||||||
|
|task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
|
chain: Arc<BeaconChain<T>>,
|
||||||
|
attestations: Vec<SingleAttestation>,
|
||||||
|
_fork_name: Option<ForkName>,
|
||||||
|
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| async move {
|
||||||
|
let result = crate::publish_attestations::publish_attestations(
|
||||||
|
task_spawner,
|
||||||
|
chain,
|
||||||
|
attestations,
|
||||||
|
network_tx,
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map(|()| warp::reply::json(&()));
|
||||||
|
convert_rejection(result).await
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
@@ -36,16 +36,14 @@ mod validator_inclusion;
|
|||||||
mod validators;
|
mod validators;
|
||||||
mod version;
|
mod version;
|
||||||
|
|
||||||
|
use crate::beacon::pool::*;
|
||||||
use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
|
use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
|
||||||
use crate::utils::{AnyVersionFilter, EthV1Filter};
|
use crate::utils::{AnyVersionFilter, EthV1Filter};
|
||||||
use crate::validator::post_validator_liveness_epoch;
|
use crate::validator::post_validator_liveness_epoch;
|
||||||
use crate::validator::*;
|
use crate::validator::*;
|
||||||
use crate::version::beacon_response;
|
use crate::version::beacon_response;
|
||||||
use beacon::states;
|
use beacon::states;
|
||||||
use beacon_chain::{
|
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped};
|
||||||
BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped,
|
|
||||||
observed_operations::ObservationOutcome,
|
|
||||||
};
|
|
||||||
use beacon_processor::BeaconProcessorSend;
|
use beacon_processor::BeaconProcessorSend;
|
||||||
pub use block_id::BlockId;
|
pub use block_id::BlockId;
|
||||||
use builder_states::get_next_withdrawals;
|
use builder_states::get_next_withdrawals;
|
||||||
@@ -62,12 +60,10 @@ use health_metrics::observe::Observe;
|
|||||||
use lighthouse_network::Enr;
|
use lighthouse_network::Enr;
|
||||||
use lighthouse_network::NetworkGlobals;
|
use lighthouse_network::NetworkGlobals;
|
||||||
use lighthouse_network::PeerId;
|
use lighthouse_network::PeerId;
|
||||||
use lighthouse_network::PubsubMessage;
|
|
||||||
use lighthouse_version::version_with_platform;
|
use lighthouse_version::version_with_platform;
|
||||||
use logging::{SSELoggingComponents, crit};
|
use logging::{SSELoggingComponents, crit};
|
||||||
use network::{NetworkMessage, NetworkSenders};
|
use network::{NetworkMessage, NetworkSenders};
|
||||||
use network_utils::enr_ext::EnrExt;
|
use network_utils::enr_ext::EnrExt;
|
||||||
use operation_pool::ReceivedPreCapella;
|
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
pub use publish_blocks::{
|
pub use publish_blocks::{
|
||||||
ProvenancedBlock, publish_blinded_block, publish_block, reconstruct_block,
|
ProvenancedBlock, publish_blinded_block, publish_block, reconstruct_block,
|
||||||
@@ -76,7 +72,6 @@ use serde::{Deserialize, Serialize};
|
|||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use ssz::Encode;
|
use ssz::Encode;
|
||||||
pub use state_id::StateId;
|
pub use state_id::StateId;
|
||||||
use std::collections::HashSet;
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@@ -93,9 +88,8 @@ use tokio_stream::{
|
|||||||
};
|
};
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
use types::{
|
use types::{
|
||||||
Attestation, AttestationData, AttesterSlashing, BeaconStateError, Checkpoint, ConfigAndPreset,
|
BeaconStateError, Checkpoint, ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256,
|
||||||
Epoch, EthSpec, ForkName, Hash256, ProposerSlashing, SignedBlindedBeaconBlock,
|
SignedBlindedBeaconBlock, Slot,
|
||||||
SignedBlsToExecutionChange, SignedVoluntaryExit, SingleAttestation, Slot, SyncCommitteeMessage,
|
|
||||||
};
|
};
|
||||||
use version::{
|
use version::{
|
||||||
ResponseIncludesVersion, V1, V2, add_consensus_version_header, add_ssz_content_type_header,
|
ResponseIncludesVersion, V1, V2, add_consensus_version_header, add_ssz_content_type_header,
|
||||||
@@ -106,7 +100,7 @@ use warp::Reply;
|
|||||||
use warp::hyper::Body;
|
use warp::hyper::Body;
|
||||||
use warp::sse::Event;
|
use warp::sse::Event;
|
||||||
use warp::{Filter, Rejection, http::Response};
|
use warp::{Filter, Rejection, http::Response};
|
||||||
use warp_utils::{query::multi_key_query, reject::convert_rejection, uor::UnifyingOrFilter};
|
use warp_utils::{query::multi_key_query, uor::UnifyingOrFilter};
|
||||||
|
|
||||||
const API_PREFIX: &str = "eth";
|
const API_PREFIX: &str = "eth";
|
||||||
|
|
||||||
@@ -804,10 +798,10 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
* beacon/blocks
|
* beacon/blocks
|
||||||
*/
|
*/
|
||||||
let consensus_version_header_filter =
|
let consensus_version_header_filter =
|
||||||
warp::header::header::<ForkName>(CONSENSUS_VERSION_HEADER);
|
warp::header::header::<ForkName>(CONSENSUS_VERSION_HEADER).boxed();
|
||||||
|
|
||||||
let optional_consensus_version_header_filter =
|
let optional_consensus_version_header_filter =
|
||||||
warp::header::optional::<ForkName>(CONSENSUS_VERSION_HEADER);
|
warp::header::optional::<ForkName>(CONSENSUS_VERSION_HEADER).boxed();
|
||||||
|
|
||||||
// POST beacon/blocks
|
// POST beacon/blocks
|
||||||
let post_beacon_blocks = eth_v1
|
let post_beacon_blocks = eth_v1
|
||||||
@@ -816,7 +810,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(warp::path("blocks"))
|
.and(warp::path("blocks"))
|
||||||
.and(warp::path::end())
|
.and(warp::path::end())
|
||||||
.and(warp::body::json())
|
.and(warp::body::json())
|
||||||
.and(consensus_version_header_filter)
|
.and(consensus_version_header_filter.clone())
|
||||||
.and(task_spawner_filter.clone())
|
.and(task_spawner_filter.clone())
|
||||||
.and(chain_filter.clone())
|
.and(chain_filter.clone())
|
||||||
.and(network_tx_filter.clone())
|
.and(network_tx_filter.clone())
|
||||||
@@ -853,7 +847,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(warp::path("blocks"))
|
.and(warp::path("blocks"))
|
||||||
.and(warp::path::end())
|
.and(warp::path::end())
|
||||||
.and(warp::body::bytes())
|
.and(warp::body::bytes())
|
||||||
.and(consensus_version_header_filter)
|
.and(consensus_version_header_filter.clone())
|
||||||
.and(task_spawner_filter.clone())
|
.and(task_spawner_filter.clone())
|
||||||
.and(chain_filter.clone())
|
.and(chain_filter.clone())
|
||||||
.and(network_tx_filter.clone())
|
.and(network_tx_filter.clone())
|
||||||
@@ -891,7 +885,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(warp::query::<api_types::BroadcastValidationQuery>())
|
.and(warp::query::<api_types::BroadcastValidationQuery>())
|
||||||
.and(warp::path::end())
|
.and(warp::path::end())
|
||||||
.and(warp::body::json())
|
.and(warp::body::json())
|
||||||
.and(consensus_version_header_filter)
|
.and(consensus_version_header_filter.clone())
|
||||||
.and(task_spawner_filter.clone())
|
.and(task_spawner_filter.clone())
|
||||||
.and(chain_filter.clone())
|
.and(chain_filter.clone())
|
||||||
.and(network_tx_filter.clone())
|
.and(network_tx_filter.clone())
|
||||||
@@ -931,7 +925,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(warp::query::<api_types::BroadcastValidationQuery>())
|
.and(warp::query::<api_types::BroadcastValidationQuery>())
|
||||||
.and(warp::path::end())
|
.and(warp::path::end())
|
||||||
.and(warp::body::bytes())
|
.and(warp::body::bytes())
|
||||||
.and(consensus_version_header_filter)
|
.and(consensus_version_header_filter.clone())
|
||||||
.and(task_spawner_filter.clone())
|
.and(task_spawner_filter.clone())
|
||||||
.and(chain_filter.clone())
|
.and(chain_filter.clone())
|
||||||
.and(network_tx_filter.clone())
|
.and(network_tx_filter.clone())
|
||||||
@@ -1408,444 +1402,67 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(warp::path("beacon"))
|
.and(warp::path("beacon"))
|
||||||
.and(warp::path("pool"))
|
.and(warp::path("pool"))
|
||||||
.and(task_spawner_filter.clone())
|
.and(task_spawner_filter.clone())
|
||||||
.and(chain_filter.clone());
|
.and(chain_filter.clone())
|
||||||
|
.boxed();
|
||||||
|
|
||||||
let beacon_pool_path_v2 = eth_v2
|
let beacon_pool_path_v2 = eth_v2
|
||||||
.clone()
|
.clone()
|
||||||
.and(warp::path("beacon"))
|
.and(warp::path("beacon"))
|
||||||
.and(warp::path("pool"))
|
.and(warp::path("pool"))
|
||||||
.and(task_spawner_filter.clone())
|
.and(task_spawner_filter.clone())
|
||||||
.and(chain_filter.clone());
|
.and(chain_filter.clone())
|
||||||
|
.boxed();
|
||||||
|
|
||||||
let beacon_pool_path_any = any_version
|
let beacon_pool_path_any = any_version
|
||||||
.clone()
|
.clone()
|
||||||
.and(warp::path("beacon"))
|
.and(warp::path("beacon"))
|
||||||
.and(warp::path("pool"))
|
.and(warp::path("pool"))
|
||||||
.and(task_spawner_filter.clone())
|
.and(task_spawner_filter.clone())
|
||||||
.and(chain_filter.clone());
|
.and(chain_filter.clone())
|
||||||
|
.boxed();
|
||||||
|
|
||||||
let post_beacon_pool_attestations_v2 = beacon_pool_path_v2
|
let post_beacon_pool_attestations_v2 = post_beacon_pool_attestations_v2(
|
||||||
.clone()
|
&network_tx_filter,
|
||||||
.and(warp::path("attestations"))
|
optional_consensus_version_header_filter,
|
||||||
.and(warp::path::end())
|
&beacon_pool_path_v2,
|
||||||
.and(warp_utils::json::json::<Vec<SingleAttestation>>())
|
|
||||||
.and(optional_consensus_version_header_filter)
|
|
||||||
.and(network_tx_filter.clone())
|
|
||||||
.then(
|
|
||||||
|task_spawner: TaskSpawner<T::EthSpec>,
|
|
||||||
chain: Arc<BeaconChain<T>>,
|
|
||||||
attestations: Vec<SingleAttestation>,
|
|
||||||
_fork_name: Option<ForkName>,
|
|
||||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| async move {
|
|
||||||
let result = crate::publish_attestations::publish_attestations(
|
|
||||||
task_spawner,
|
|
||||||
chain,
|
|
||||||
attestations,
|
|
||||||
network_tx,
|
|
||||||
true,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map(|()| warp::reply::json(&()));
|
|
||||||
convert_rejection(result).await
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// GET beacon/pool/attestations?committee_index,slot
|
// GET beacon/pool/attestations?committee_index,slot
|
||||||
let get_beacon_pool_attestations = beacon_pool_path_any
|
let get_beacon_pool_attestations = get_beacon_pool_attestations(&beacon_pool_path_any);
|
||||||
.clone()
|
|
||||||
.and(warp::path("attestations"))
|
|
||||||
.and(warp::path::end())
|
|
||||||
.and(warp::query::<api_types::AttestationPoolQuery>())
|
|
||||||
.then(
|
|
||||||
|endpoint_version: EndpointVersion,
|
|
||||||
task_spawner: TaskSpawner<T::EthSpec>,
|
|
||||||
chain: Arc<BeaconChain<T>>,
|
|
||||||
query: api_types::AttestationPoolQuery| {
|
|
||||||
task_spawner.blocking_response_task(Priority::P1, move || {
|
|
||||||
let query_filter = |data: &AttestationData, committee_indices: HashSet<u64>| {
|
|
||||||
query.slot.is_none_or(|slot| slot == data.slot)
|
|
||||||
&& query
|
|
||||||
.committee_index
|
|
||||||
.is_none_or(|index| committee_indices.contains(&index))
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut attestations = chain.op_pool.get_filtered_attestations(query_filter);
|
|
||||||
attestations.extend(
|
|
||||||
chain
|
|
||||||
.naive_aggregation_pool
|
|
||||||
.read()
|
|
||||||
.iter()
|
|
||||||
.filter(|&att| {
|
|
||||||
query_filter(att.data(), att.get_committee_indices_map())
|
|
||||||
})
|
|
||||||
.cloned(),
|
|
||||||
);
|
|
||||||
// Use the current slot to find the fork version, and convert all messages to the
|
|
||||||
// current fork's format. This is to ensure consistent message types matching
|
|
||||||
// `Eth-Consensus-Version`.
|
|
||||||
let current_slot =
|
|
||||||
chain
|
|
||||||
.slot_clock
|
|
||||||
.now()
|
|
||||||
.ok_or(warp_utils::reject::custom_server_error(
|
|
||||||
"unable to read slot clock".to_string(),
|
|
||||||
))?;
|
|
||||||
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(current_slot);
|
|
||||||
let attestations = attestations
|
|
||||||
.into_iter()
|
|
||||||
.filter(|att| {
|
|
||||||
(fork_name.electra_enabled() && matches!(att, Attestation::Electra(_)))
|
|
||||||
|| (!fork_name.electra_enabled()
|
|
||||||
&& matches!(att, Attestation::Base(_)))
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let require_version = match endpoint_version {
|
|
||||||
V1 => ResponseIncludesVersion::No,
|
|
||||||
V2 => ResponseIncludesVersion::Yes(fork_name),
|
|
||||||
_ => return Err(unsupported_version_rejection(endpoint_version)),
|
|
||||||
};
|
|
||||||
|
|
||||||
let res = beacon_response(require_version, &attestations);
|
|
||||||
Ok(add_consensus_version_header(
|
|
||||||
warp::reply::json(&res).into_response(),
|
|
||||||
fork_name,
|
|
||||||
))
|
|
||||||
})
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// POST beacon/pool/attester_slashings
|
// POST beacon/pool/attester_slashings
|
||||||
let post_beacon_pool_attester_slashings = beacon_pool_path_any
|
let post_beacon_pool_attester_slashings =
|
||||||
.clone()
|
post_beacon_pool_attester_slashings(&network_tx_filter, &beacon_pool_path_any);
|
||||||
.and(warp::path("attester_slashings"))
|
|
||||||
.and(warp::path::end())
|
|
||||||
.and(warp_utils::json::json())
|
|
||||||
.and(network_tx_filter.clone())
|
|
||||||
.then(
|
|
||||||
// V1 and V2 are identical except V2 has a consensus version header in the request.
|
|
||||||
// We only require this header for SSZ deserialization, which isn't supported for
|
|
||||||
// this endpoint presently.
|
|
||||||
|_endpoint_version: EndpointVersion,
|
|
||||||
task_spawner: TaskSpawner<T::EthSpec>,
|
|
||||||
chain: Arc<BeaconChain<T>>,
|
|
||||||
slashing: AttesterSlashing<T::EthSpec>,
|
|
||||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
|
|
||||||
task_spawner.blocking_json_task(Priority::P0, move || {
|
|
||||||
let outcome = chain
|
|
||||||
.verify_attester_slashing_for_gossip(slashing.clone())
|
|
||||||
.map_err(|e| {
|
|
||||||
warp_utils::reject::object_invalid(format!(
|
|
||||||
"gossip verification failed: {:?}",
|
|
||||||
e
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// Notify the validator monitor.
|
|
||||||
chain
|
|
||||||
.validator_monitor
|
|
||||||
.read()
|
|
||||||
.register_api_attester_slashing(slashing.to_ref());
|
|
||||||
|
|
||||||
if let ObservationOutcome::New(slashing) = outcome {
|
|
||||||
utils::publish_pubsub_message(
|
|
||||||
&network_tx,
|
|
||||||
PubsubMessage::AttesterSlashing(Box::new(
|
|
||||||
slashing.clone().into_inner(),
|
|
||||||
)),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
chain.import_attester_slashing(slashing);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// GET beacon/pool/attester_slashings
|
// GET beacon/pool/attester_slashings
|
||||||
let get_beacon_pool_attester_slashings =
|
let get_beacon_pool_attester_slashings =
|
||||||
beacon_pool_path_any
|
get_beacon_pool_attester_slashings(&beacon_pool_path_any);
|
||||||
.clone()
|
|
||||||
.and(warp::path("attester_slashings"))
|
|
||||||
.and(warp::path::end())
|
|
||||||
.then(
|
|
||||||
|endpoint_version: EndpointVersion,
|
|
||||||
task_spawner: TaskSpawner<T::EthSpec>,
|
|
||||||
chain: Arc<BeaconChain<T>>| {
|
|
||||||
task_spawner.blocking_response_task(Priority::P1, move || {
|
|
||||||
let slashings = chain.op_pool.get_all_attester_slashings();
|
|
||||||
|
|
||||||
// Use the current slot to find the fork version, and convert all messages to the
|
|
||||||
// current fork's format. This is to ensure consistent message types matching
|
|
||||||
// `Eth-Consensus-Version`.
|
|
||||||
let current_slot = chain.slot_clock.now().ok_or(
|
|
||||||
warp_utils::reject::custom_server_error(
|
|
||||||
"unable to read slot clock".to_string(),
|
|
||||||
),
|
|
||||||
)?;
|
|
||||||
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(current_slot);
|
|
||||||
let slashings = slashings
|
|
||||||
.into_iter()
|
|
||||||
.filter(|slashing| {
|
|
||||||
(fork_name.electra_enabled()
|
|
||||||
&& matches!(slashing, AttesterSlashing::Electra(_)))
|
|
||||||
|| (!fork_name.electra_enabled()
|
|
||||||
&& matches!(slashing, AttesterSlashing::Base(_)))
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let require_version = match endpoint_version {
|
|
||||||
V1 => ResponseIncludesVersion::No,
|
|
||||||
V2 => ResponseIncludesVersion::Yes(fork_name),
|
|
||||||
_ => return Err(unsupported_version_rejection(endpoint_version)),
|
|
||||||
};
|
|
||||||
|
|
||||||
let res = beacon_response(require_version, &slashings);
|
|
||||||
Ok(add_consensus_version_header(
|
|
||||||
warp::reply::json(&res).into_response(),
|
|
||||||
fork_name,
|
|
||||||
))
|
|
||||||
})
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// POST beacon/pool/proposer_slashings
|
// POST beacon/pool/proposer_slashings
|
||||||
let post_beacon_pool_proposer_slashings = beacon_pool_path
|
let post_beacon_pool_proposer_slashings =
|
||||||
.clone()
|
post_beacon_pool_proposer_slashings(&network_tx_filter, &beacon_pool_path);
|
||||||
.and(warp::path("proposer_slashings"))
|
|
||||||
.and(warp::path::end())
|
|
||||||
.and(warp_utils::json::json())
|
|
||||||
.and(network_tx_filter.clone())
|
|
||||||
.then(
|
|
||||||
|task_spawner: TaskSpawner<T::EthSpec>,
|
|
||||||
chain: Arc<BeaconChain<T>>,
|
|
||||||
slashing: ProposerSlashing,
|
|
||||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
|
|
||||||
task_spawner.blocking_json_task(Priority::P0, move || {
|
|
||||||
let outcome = chain
|
|
||||||
.verify_proposer_slashing_for_gossip(slashing.clone())
|
|
||||||
.map_err(|e| {
|
|
||||||
warp_utils::reject::object_invalid(format!(
|
|
||||||
"gossip verification failed: {:?}",
|
|
||||||
e
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// Notify the validator monitor.
|
|
||||||
chain
|
|
||||||
.validator_monitor
|
|
||||||
.read()
|
|
||||||
.register_api_proposer_slashing(&slashing);
|
|
||||||
|
|
||||||
if let ObservationOutcome::New(slashing) = outcome {
|
|
||||||
utils::publish_pubsub_message(
|
|
||||||
&network_tx,
|
|
||||||
PubsubMessage::ProposerSlashing(Box::new(
|
|
||||||
slashing.clone().into_inner(),
|
|
||||||
)),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
chain.import_proposer_slashing(slashing);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// GET beacon/pool/proposer_slashings
|
// GET beacon/pool/proposer_slashings
|
||||||
let get_beacon_pool_proposer_slashings = beacon_pool_path
|
let get_beacon_pool_proposer_slashings = get_beacon_pool_proposer_slashings(&beacon_pool_path);
|
||||||
.clone()
|
|
||||||
.and(warp::path("proposer_slashings"))
|
|
||||||
.and(warp::path::end())
|
|
||||||
.then(
|
|
||||||
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
|
|
||||||
task_spawner.blocking_json_task(Priority::P1, move || {
|
|
||||||
let attestations = chain.op_pool.get_all_proposer_slashings();
|
|
||||||
Ok(api_types::GenericResponse::from(attestations))
|
|
||||||
})
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// POST beacon/pool/voluntary_exits
|
// POST beacon/pool/voluntary_exits
|
||||||
let post_beacon_pool_voluntary_exits = beacon_pool_path
|
let post_beacon_pool_voluntary_exits =
|
||||||
.clone()
|
post_beacon_pool_voluntary_exits(&network_tx_filter, &beacon_pool_path);
|
||||||
.and(warp::path("voluntary_exits"))
|
|
||||||
.and(warp::path::end())
|
|
||||||
.and(warp_utils::json::json())
|
|
||||||
.and(network_tx_filter.clone())
|
|
||||||
.then(
|
|
||||||
|task_spawner: TaskSpawner<T::EthSpec>,
|
|
||||||
chain: Arc<BeaconChain<T>>,
|
|
||||||
exit: SignedVoluntaryExit,
|
|
||||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
|
|
||||||
task_spawner.blocking_json_task(Priority::P0, move || {
|
|
||||||
let outcome = chain
|
|
||||||
.verify_voluntary_exit_for_gossip(exit.clone())
|
|
||||||
.map_err(|e| {
|
|
||||||
warp_utils::reject::object_invalid(format!(
|
|
||||||
"gossip verification failed: {:?}",
|
|
||||||
e
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// Notify the validator monitor.
|
|
||||||
chain
|
|
||||||
.validator_monitor
|
|
||||||
.read()
|
|
||||||
.register_api_voluntary_exit(&exit.message);
|
|
||||||
|
|
||||||
if let ObservationOutcome::New(exit) = outcome {
|
|
||||||
utils::publish_pubsub_message(
|
|
||||||
&network_tx,
|
|
||||||
PubsubMessage::VoluntaryExit(Box::new(exit.clone().into_inner())),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
chain.import_voluntary_exit(exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// GET beacon/pool/voluntary_exits
|
// GET beacon/pool/voluntary_exits
|
||||||
let get_beacon_pool_voluntary_exits = beacon_pool_path
|
let get_beacon_pool_voluntary_exits = get_beacon_pool_voluntary_exits(&beacon_pool_path);
|
||||||
.clone()
|
|
||||||
.and(warp::path("voluntary_exits"))
|
|
||||||
.and(warp::path::end())
|
|
||||||
.then(
|
|
||||||
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
|
|
||||||
task_spawner.blocking_json_task(Priority::P1, move || {
|
|
||||||
let attestations = chain.op_pool.get_all_voluntary_exits();
|
|
||||||
Ok(api_types::GenericResponse::from(attestations))
|
|
||||||
})
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// POST beacon/pool/sync_committees
|
// POST beacon/pool/sync_committees
|
||||||
let post_beacon_pool_sync_committees = beacon_pool_path
|
let post_beacon_pool_sync_committees =
|
||||||
.clone()
|
post_beacon_pool_sync_committees(&network_tx_filter, &beacon_pool_path);
|
||||||
.and(warp::path("sync_committees"))
|
|
||||||
.and(warp::path::end())
|
|
||||||
.and(warp_utils::json::json())
|
|
||||||
.and(network_tx_filter.clone())
|
|
||||||
.then(
|
|
||||||
|task_spawner: TaskSpawner<T::EthSpec>,
|
|
||||||
chain: Arc<BeaconChain<T>>,
|
|
||||||
signatures: Vec<SyncCommitteeMessage>,
|
|
||||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
|
|
||||||
task_spawner.blocking_json_task(Priority::P0, move || {
|
|
||||||
sync_committees::process_sync_committee_signatures(
|
|
||||||
signatures, network_tx, &chain,
|
|
||||||
)?;
|
|
||||||
Ok(api_types::GenericResponse::from(()))
|
|
||||||
})
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// GET beacon/pool/bls_to_execution_changes
|
// GET beacon/pool/bls_to_execution_changes
|
||||||
let get_beacon_pool_bls_to_execution_changes = beacon_pool_path
|
let get_beacon_pool_bls_to_execution_changes =
|
||||||
.clone()
|
get_beacon_pool_bls_to_execution_changes(&beacon_pool_path);
|
||||||
.and(warp::path("bls_to_execution_changes"))
|
|
||||||
.and(warp::path::end())
|
|
||||||
.then(
|
|
||||||
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
|
|
||||||
task_spawner.blocking_json_task(Priority::P1, move || {
|
|
||||||
let address_changes = chain.op_pool.get_all_bls_to_execution_changes();
|
|
||||||
Ok(api_types::GenericResponse::from(address_changes))
|
|
||||||
})
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// POST beacon/pool/bls_to_execution_changes
|
// POST beacon/pool/bls_to_execution_changes
|
||||||
let post_beacon_pool_bls_to_execution_changes = beacon_pool_path
|
let post_beacon_pool_bls_to_execution_changes =
|
||||||
.clone()
|
post_beacon_pool_bls_to_execution_changes(&network_tx_filter, &beacon_pool_path);
|
||||||
.and(warp::path("bls_to_execution_changes"))
|
|
||||||
.and(warp::path::end())
|
|
||||||
.and(warp_utils::json::json())
|
|
||||||
.and(network_tx_filter.clone())
|
|
||||||
.then(
|
|
||||||
|task_spawner: TaskSpawner<T::EthSpec>,
|
|
||||||
chain: Arc<BeaconChain<T>>,
|
|
||||||
address_changes: Vec<SignedBlsToExecutionChange>,
|
|
||||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
|
|
||||||
task_spawner.blocking_json_task(Priority::P0, move || {
|
|
||||||
let mut failures = vec![];
|
|
||||||
|
|
||||||
for (index, address_change) in address_changes.into_iter().enumerate() {
|
|
||||||
let validator_index = address_change.message.validator_index;
|
|
||||||
|
|
||||||
match chain.verify_bls_to_execution_change_for_http_api(address_change) {
|
|
||||||
Ok(ObservationOutcome::New(verified_address_change)) => {
|
|
||||||
let validator_index =
|
|
||||||
verified_address_change.as_inner().message.validator_index;
|
|
||||||
let address = verified_address_change
|
|
||||||
.as_inner()
|
|
||||||
.message
|
|
||||||
.to_execution_address;
|
|
||||||
|
|
||||||
// New to P2P *and* op pool, gossip immediately if post-Capella.
|
|
||||||
let received_pre_capella =
|
|
||||||
if chain.current_slot_is_post_capella().unwrap_or(false) {
|
|
||||||
ReceivedPreCapella::No
|
|
||||||
} else {
|
|
||||||
ReceivedPreCapella::Yes
|
|
||||||
};
|
|
||||||
if matches!(received_pre_capella, ReceivedPreCapella::No) {
|
|
||||||
utils::publish_pubsub_message(
|
|
||||||
&network_tx,
|
|
||||||
PubsubMessage::BlsToExecutionChange(Box::new(
|
|
||||||
verified_address_change.as_inner().clone(),
|
|
||||||
)),
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Import to op pool (may return `false` if there's a race).
|
|
||||||
let imported = chain.import_bls_to_execution_change(
|
|
||||||
verified_address_change,
|
|
||||||
received_pre_capella,
|
|
||||||
);
|
|
||||||
|
|
||||||
info!(
|
|
||||||
%validator_index,
|
|
||||||
?address,
|
|
||||||
published =
|
|
||||||
matches!(received_pre_capella, ReceivedPreCapella::No),
|
|
||||||
imported,
|
|
||||||
"Processed BLS to execution change"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(ObservationOutcome::AlreadyKnown) => {
|
|
||||||
debug!(%validator_index, "BLS to execution change already known");
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!(
|
|
||||||
validator_index,
|
|
||||||
reason = ?e,
|
|
||||||
source = "HTTP",
|
|
||||||
"Invalid BLS to execution change"
|
|
||||||
);
|
|
||||||
failures.push(api_types::Failure::new(
|
|
||||||
index,
|
|
||||||
format!("invalid: {e:?}"),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if failures.is_empty() {
|
|
||||||
Ok(())
|
|
||||||
} else {
|
|
||||||
Err(warp_utils::reject::indexed_bad_request(
|
|
||||||
"some BLS to execution changes failed to verify".into(),
|
|
||||||
failures,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
let beacon_rewards_path = eth_v1
|
let beacon_rewards_path = eth_v1
|
||||||
.clone()
|
.clone()
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use network::{NetworkMessage, ValidatorSubscriptionMessage};
|
|||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc::{Sender, UnboundedSender};
|
use tokio::sync::mpsc::{Sender, UnboundedSender};
|
||||||
use types::{ChainSpec, EthSpec};
|
use types::{ChainSpec, EthSpec, ForkName};
|
||||||
use warp::Rejection;
|
use warp::Rejection;
|
||||||
use warp::filters::BoxedFilter;
|
use warp::filters::BoxedFilter;
|
||||||
|
|
||||||
@@ -20,6 +20,7 @@ pub type TaskSpawnerFilter<T> = BoxedFilter<(TaskSpawner<<T as BeaconChainTypes>
|
|||||||
pub type ValidatorSubscriptionTxFilter = BoxedFilter<(Sender<ValidatorSubscriptionMessage>,)>;
|
pub type ValidatorSubscriptionTxFilter = BoxedFilter<(Sender<ValidatorSubscriptionMessage>,)>;
|
||||||
pub type NetworkTxFilter<T> =
|
pub type NetworkTxFilter<T> =
|
||||||
BoxedFilter<(UnboundedSender<NetworkMessage<<T as BeaconChainTypes>::EthSpec>>,)>;
|
BoxedFilter<(UnboundedSender<NetworkMessage<<T as BeaconChainTypes>::EthSpec>>,)>;
|
||||||
|
pub type OptionalConsensusVersionHeaderFilter = BoxedFilter<(Option<ForkName>,)>;
|
||||||
|
|
||||||
pub fn from_meta_data<E: EthSpec>(
|
pub fn from_meta_data<E: EthSpec>(
|
||||||
meta_data: &RwLock<MetaData<E>>,
|
meta_data: &RwLock<MetaData<E>>,
|
||||||
|
|||||||
Reference in New Issue
Block a user