Move register_validator off beacon processor

This commit is contained in:
Paul Hauner
2023-07-11 17:11:07 +10:00
parent 311e1bd197
commit 219a61c547

View File

@@ -59,7 +59,10 @@ use std::sync::Arc;
use sysinfo::{System, SystemExt}; use sysinfo::{System, SystemExt};
use system_health::observe_system_health_bn; use system_health::observe_system_health_bn;
use task_spawner::{Priority, TaskSpawner}; use task_spawner::{Priority, TaskSpawner};
use tokio::sync::mpsc::{Sender, UnboundedSender}; use tokio::sync::{
mpsc::{Sender, UnboundedSender},
oneshot,
};
use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{ use types::{
Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError,
@@ -3336,124 +3339,162 @@ pub fn serve<T: BeaconChainTypes>(
|task_spawner: TaskSpawner<T::EthSpec>, |task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
log: Logger, log: Logger,
register_val_data: Vec<SignedValidatorRegistrationData>| { register_val_data: Vec<SignedValidatorRegistrationData>| async {
task_spawner.spawn_async_with_rejection(Priority::P0, async move { let (tx, rx) = oneshot::channel();
let execution_layer = chain
.execution_layer
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)
.map_err(warp_utils::reject::beacon_chain_error)?;
let current_slot = chain
.slot_clock
.now_or_genesis()
.ok_or(BeaconChainError::UnableToReadSlot)
.map_err(warp_utils::reject::beacon_chain_error)?;
let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());
debug!( task_spawner
log, .spawn_async_with_rejection(Priority::P0, async move {
"Received register validator request"; let execution_layer = chain
"count" => register_val_data.len(), .execution_layer
); .as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)
.map_err(warp_utils::reject::beacon_chain_error)?;
let current_slot = chain
.slot_clock
.now_or_genesis()
.ok_or(BeaconChainError::UnableToReadSlot)
.map_err(warp_utils::reject::beacon_chain_error)?;
let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());
let head_snapshot = chain.head_snapshot(); debug!(
let spec = &chain.spec; log,
"Received register validator request";
"count" => register_val_data.len(),
);
let (preparation_data, filtered_registration_data): ( let head_snapshot = chain.head_snapshot();
Vec<ProposerPreparationData>, let spec = &chain.spec;
Vec<SignedValidatorRegistrationData>,
) = register_val_data
.into_iter()
.filter_map(|register_data| {
chain
.validator_index(&register_data.message.pubkey)
.ok()
.flatten()
.and_then(|validator_index| {
let validator = head_snapshot
.beacon_state
.get_validator(validator_index)
.ok()?;
let validator_status = ValidatorStatus::from_validator(
validator,
current_epoch,
spec.far_future_epoch,
)
.superstatus();
let is_active_or_pending =
matches!(validator_status, ValidatorStatus::Pending)
|| matches!(validator_status, ValidatorStatus::Active);
// Filter out validators who are not 'active' or 'pending'. let (preparation_data, filtered_registration_data): (
is_active_or_pending.then_some({ Vec<ProposerPreparationData>,
( Vec<SignedValidatorRegistrationData>,
ProposerPreparationData { ) = register_val_data
validator_index: validator_index as u64, .into_iter()
fee_recipient: register_data.message.fee_recipient, .filter_map(|register_data| {
}, chain
register_data, .validator_index(&register_data.message.pubkey)
.ok()
.flatten()
.and_then(|validator_index| {
let validator = head_snapshot
.beacon_state
.get_validator(validator_index)
.ok()?;
let validator_status = ValidatorStatus::from_validator(
validator,
current_epoch,
spec.far_future_epoch,
) )
.superstatus();
let is_active_or_pending =
matches!(validator_status, ValidatorStatus::Pending)
|| matches!(
validator_status,
ValidatorStatus::Active
);
// Filter out validators who are not 'active' or 'pending'.
is_active_or_pending.then_some({
(
ProposerPreparationData {
validator_index: validator_index as u64,
fee_recipient: register_data
.message
.fee_recipient,
},
register_data,
)
})
}) })
}) })
}) .unzip();
.unzip();
// Update the prepare beacon proposer cache based on this request. // Update the prepare beacon proposer cache based on this request.
execution_layer execution_layer
.update_proposer_preparation(current_epoch, &preparation_data) .update_proposer_preparation(current_epoch, &preparation_data)
.await; .await;
// Call prepare beacon proposer blocking with the latest update in order to make // Call prepare beacon proposer blocking with the latest update in order to make
// sure we have a local payload to fall back to in the event of the blinded block // sure we have a local payload to fall back to in the event of the blinded block
// flow failing. // flow failing.
chain chain
.prepare_beacon_proposer(current_slot) .prepare_beacon_proposer(current_slot)
.await .await
.map_err(|e| { .map_err(|e| {
warp_utils::reject::custom_bad_request(format!( warp_utils::reject::custom_bad_request(format!(
"error updating proposer preparations: {:?}", "error updating proposer preparations: {:?}",
e e
)) ))
})?; })?;
let builder = execution_layer info!(
.builder() log,
.as_ref() "Forwarding register validator request to connected builder";
.ok_or(BeaconChainError::BuilderMissing) "count" => filtered_registration_data.len(),
.map_err(warp_utils::reject::beacon_chain_error)?; );
info!( // It's a waste of a `BeaconProcessor` worker to just
log, // wait on a response from the builder (especially since
"Forwarding register validator request to connected builder"; // they have frequent timeouts). Spawn a new task and
"count" => filtered_registration_data.len(), // send the response back to our original HTTP request
); // task via a channel.
let builder_future = async move {
let builder = chain
.execution_layer
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)
.map_err(warp_utils::reject::beacon_chain_error)?
.builder()
.as_ref()
.ok_or(BeaconChainError::BuilderMissing)
.map_err(warp_utils::reject::beacon_chain_error)?;
builder builder
.post_builder_validators(&filtered_registration_data) .post_builder_validators(&filtered_registration_data)
.await .await
.map(|resp| warp::reply::json(&resp).into_response()) .map(|resp| warp::reply::json(&resp).into_response())
.map_err(|e| { .map_err(|e| {
warn!( warn!(
log, log,
"Relay error when registering validator(s)"; "Relay error when registering validator(s)";
"num_registrations" => filtered_registration_data.len(), "num_registrations" => filtered_registration_data.len(),
"error" => ?e "error" => ?e
);
// Forward the HTTP status code if we are able to, otherwise fall back
// to a server error.
if let eth2::Error::ServerMessage(message) = e {
if message.code == StatusCode::BAD_REQUEST.as_u16() {
return warp_utils::reject::custom_bad_request(message.message);
} else {
// According to the spec this response should only be a 400 or 500,
// so we fall back to a 500 here.
return warp_utils::reject::custom_server_error(
message.message,
); );
} // Forward the HTTP status code if we are able to, otherwise fall back
} // to a server error.
warp_utils::reject::custom_server_error(format!("{e:?}")) if let eth2::Error::ServerMessage(message) = e {
}) if message.code == StatusCode::BAD_REQUEST.as_u16() {
return warp_utils::reject::custom_bad_request(
message.message,
);
} else {
// According to the spec this response should only be a 400 or 500,
// so we fall back to a 500 here.
return warp_utils::reject::custom_server_error(
message.message,
);
}
}
warp_utils::reject::custom_server_error(format!("{e:?}"))
})
};
tokio::task::spawn(async move { tx.send(builder_future.await) });
// Just send a generic 200 OK from this closure. We'll
// ignore the `Ok` variant and form a proper response
// from what is sent back down the channel.
Ok(warp::reply::reply().into_response())
})
.await?;
// Await a response from the builder without blocking a
// `BeaconProcessor` worker.
rx.await.unwrap_or_else(|_| {
Ok(warp::reply::with_status(
warp::reply::json(&"No response from channel"),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response())
}) })
}, },
); );