POST inclusion list endpoint

This commit is contained in:
Eitan Seri-Levi
2025-02-02 12:37:21 +03:00
parent 3c90258468
commit 88afbb39be
7 changed files with 302 additions and 16 deletions

View File

@@ -20,6 +20,7 @@ mod produce_block;
mod proposer_duties;
mod publish_attestations;
mod publish_blocks;
mod publish_inclusion_lists;
mod standard_block_rewards;
mod state_id;
mod sync_committee_rewards;
@@ -2318,30 +2319,33 @@ pub fn serve<T: BeaconChainTypes>(
);
// POST beacon/pool/inclusion_lists
// TODO(focil) unused endpoint and variables
let _post_beacon_pool_inclusion_lists = beacon_pool_path
let post_beacon_pool_inclusion_lists = beacon_pool_path
.clone()
.and(warp::path("inclusion_lists"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(reprocess_send_filter.clone())
.and(log_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
_chain: Arc<BeaconChain<T>>,
chain: Arc<BeaconChain<T>>,
inclusion_lists: Vec<SignedInclusionList<T::EthSpec>>,
_network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.blocking_json_task(Priority::P0, move || {
// TODO(focil): actually gossip the inclusion lists
info!(
log,
"Posting signed inclusion lists for gossip";
"num_inclusion_lists" => inclusion_lists.len(),
);
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_tx: Option<Sender<ReprocessQueueMessage>>,
log: Logger| async move {
let result = crate::publish_inclusion_lists::publish_inclusion_lists(
task_spawner,
chain,
inclusion_lists,
network_tx,
reprocess_tx,
log,
)
.await
.map(|()| warp::reply::json(&()));
Ok(())
})
convert_rejection(result).await
},
);
@@ -4913,6 +4917,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_lighthouse_block_rewards)
.uor(post_lighthouse_ui_validator_metrics)
.uor(post_lighthouse_ui_validator_info)
.uor(post_beacon_pool_inclusion_lists)
.recover(warp_utils::reject::handle_rejection),
),
)

View File

@@ -0,0 +1,217 @@
use std::{sync::Arc, time::Duration};
use beacon_chain::inclusion_list_verification::GossipInclusionListError;
use beacon_chain::{validator_monitor::timestamp_now, BeaconChain, BeaconChainTypes};
use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage;
use eth2::types::Failure;
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use slog::{debug, error, info, Logger};
use tokio::sync::{
mpsc::{Sender, UnboundedSender},
oneshot,
};
use types::SignedInclusionList;
use crate::task_spawner::{Priority, TaskSpawner};
enum PublishInclusionListResult {
Success,
#[allow(dead_code)]
Reprocessing(oneshot::Receiver<Result<(), Error>>),
Failure(Error),
AlreadyKnown,
}
#[derive(Debug)]
pub enum Error {
Validation(GossipInclusionListError),
Publication,
ReprocessTimeout,
}
pub async fn publish_inclusion_lists<T: BeaconChainTypes>(
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
inclusion_lists: Vec<SignedInclusionList<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
_reprocess_send: Option<Sender<ReprocessQueueMessage>>,
log: Logger,
) -> Result<(), warp::Rejection> {
// Gossip validate and publish inclusion lists that can be immediately processed.
let seen_timestamp = timestamp_now();
let inner_log = log.clone();
let inclusion_list_metadata = inclusion_lists
.iter()
.map(|inclusion_list| {
(
inclusion_list.message.slot,
inclusion_list.message.validator_index,
)
})
.collect::<Vec<_>>();
let mut prelim_results = task_spawner
.blocking_task(Priority::P0, move || {
Ok(inclusion_lists
.into_iter()
.map(move |inclusion_list| {
match verify_and_publish_inclusion_list(
&chain,
&inclusion_list,
seen_timestamp,
&network_tx,
&inner_log,
) {
Ok(()) => PublishInclusionListResult::Success,
Err(e) => PublishInclusionListResult::Failure(e),
}
})
.map(Some)
.collect::<Vec<_>>())
})
.await?;
let (reprocess_indices, reprocess_futures): (Vec<_>, Vec<_>) = prelim_results
.iter_mut()
.enumerate()
.filter_map(|(i, opt_result)| {
if let Some(PublishInclusionListResult::Reprocessing(..)) = &opt_result {
let PublishInclusionListResult::Reprocessing(rx) = opt_result.take()? else {
// Unreachable.
return None;
};
Some((i, rx))
} else {
None
}
})
.unzip();
let reprocess_results = futures::future::join_all(reprocess_futures).await;
// Join everything back together and construct a response.
// This part should be quick so we just stay in the Tokio executor's async task.
for (i, reprocess_result) in reprocess_indices.into_iter().zip(reprocess_results) {
let Some(result_entry) = prelim_results.get_mut(i) else {
error!(
log,
"Unreachable case in inclusion list publishing";
"case" => "prelim out of bounds",
"request_index" => i,
);
continue;
};
*result_entry = Some(match reprocess_result {
Ok(Ok(())) => PublishInclusionListResult::Success,
// Inclusion list already known
Ok(Err(Error::Validation(GossipInclusionListError::PriorInclusionListKnown))) => {
PublishInclusionListResult::AlreadyKnown
}
Ok(Err(e)) => PublishInclusionListResult::Failure(e),
// Oneshot was dropped, indicating that the inclusion list either timed out in the
// reprocess queue or was dropped due to some error.
Err(_) => PublishInclusionListResult::Failure(Error::ReprocessTimeout),
});
}
// Construct the response.
let mut failures = vec![];
let mut num_already_known = 0;
for (index, result) in prelim_results.iter().enumerate() {
match result {
Some(PublishInclusionListResult::Success) => {}
Some(PublishInclusionListResult::AlreadyKnown) => num_already_known += 1,
Some(PublishInclusionListResult::Failure(e)) => {
if let Some((slot, validator_index)) = inclusion_list_metadata.get(index) {
error!(
log,
"Failure verifying attestation for gossip";
"error" => ?e,
"request_index" => index,
"validator_index" => validator_index,
"inclusion_list_slot" => slot,
);
failures.push(Failure::new(index, format!("{e:?}")));
} else {
error!(
log,
"Unreachable case in inclusion list publishing";
"case" => "out of bounds",
"request_index" => index
);
failures.push(Failure::new(index, "metadata logic error".into()));
}
}
Some(PublishInclusionListResult::Reprocessing(_)) => {
// TODO(focil) reprocessing
info!(
log,
"Reprocessing result";
);
}
None => {
error!(
log,
"Unreachable case in inclusion list publishing";
"case" => "result is None",
"request_index" => index
);
failures.push(Failure::new(index, "result logic error".into()));
}
}
}
if num_already_known > 0 {
debug!(
log,
"Some inclusion lists already known";
"count" => num_already_known
);
}
if failures.is_empty() {
Ok(())
} else {
Err(warp_utils::reject::indexed_bad_request(
"error processing inclusion list".to_string(),
failures,
))
}
}
fn verify_and_publish_inclusion_list<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
inclusion_list: &SignedInclusionList<T::EthSpec>,
seen_timestamp: Duration,
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
_log: &Logger,
) -> Result<(), Error> {
let verified_inclusion_list = chain
.verify_inclusion_list_for_gossip(inclusion_list)
.map_err(Error::Validation)?;
network_tx
.send(NetworkMessage::Publish {
messages: vec![PubsubMessage::InclusionList(Box::new(
verified_inclusion_list.signed_il.clone(),
))],
})
.map_err(|_| Error::Publication)?;
// TODO(focil) add reprocess logic?
// Notify the validator monitor.
chain.validator_monitor.read().register_api_inclusion_list(
seen_timestamp,
&verified_inclusion_list.signed_il,
&chain.slot_clock,
);
// Store verified IL in the IL cache
chain.on_verified_inclusion_list(verified_inclusion_list.signed_il);
Ok(())
}