Merge branch 'unstable' into max-blobs-preset

This commit is contained in:
Pawan Dhananjay
2024-10-21 14:46:00 -07:00
287 changed files with 10187 additions and 9415 deletions

View File

@@ -11,11 +11,9 @@ pub fn build_block_contents<E: EthSpec>(
BeaconBlockResponseWrapper::Blinded(block) => {
Ok(ProduceBlockV3Response::Blinded(block.block))
}
BeaconBlockResponseWrapper::Full(block) => match fork_name {
ForkName::Base | ForkName::Altair | ForkName::Bellatrix | ForkName::Capella => Ok(
ProduceBlockV3Response::Full(FullBlockContents::Block(block.block)),
),
ForkName::Deneb | ForkName::Electra => {
BeaconBlockResponseWrapper::Full(block) => {
if fork_name.deneb_enabled() {
let BeaconBlockResponse {
block,
state: _,
@@ -37,7 +35,11 @@ pub fn build_block_contents<E: EthSpec>(
blobs,
}),
))
} else {
Ok(ProduceBlockV3Response::Full(FullBlockContents::Block(
block.block,
)))
}
},
}
}
}

View File

@@ -4,7 +4,7 @@ use safe_arith::SafeArith;
use state_processing::per_block_processing::get_expected_withdrawals;
use state_processing::state_advance::partial_state_advance;
use std::sync::Arc;
use types::{BeaconState, EthSpec, ForkName, Slot, Withdrawals};
use types::{BeaconState, EthSpec, Slot, Withdrawals};
const MAX_EPOCH_LOOKAHEAD: u64 = 2;
@@ -53,7 +53,8 @@ fn get_next_withdrawals_sanity_checks<T: BeaconChainTypes>(
}
let fork = chain.spec.fork_name_at_slot::<T::EthSpec>(proposal_slot);
if let ForkName::Base | ForkName::Altair | ForkName::Bellatrix = fork {
if !fork.capella_enabled() {
return Err(warp_utils::reject::custom_bad_request(
"the specified state is a pre-capella state.".to_string(),
));

View File

@@ -31,7 +31,7 @@ mod validator_inclusion;
mod validators;
mod version;
use crate::light_client::get_light_client_updates;
use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
use crate::version::fork_versioned_response;
use beacon_chain::{
@@ -146,13 +146,13 @@ pub struct Config {
pub listen_port: u16,
pub allow_origin: Option<String>,
pub tls_config: Option<TlsConfig>,
pub spec_fork_name: Option<ForkName>,
pub data_dir: PathBuf,
pub sse_capacity_multiplier: usize,
pub enable_beacon_processor: bool,
#[serde(with = "eth2::types::serde_status_code")]
pub duplicate_block_status_code: StatusCode,
pub enable_light_client_server: bool,
pub target_peers: usize,
}
impl Default for Config {
@@ -163,12 +163,12 @@ impl Default for Config {
listen_port: 5052,
allow_origin: None,
tls_config: None,
spec_fork_name: None,
data_dir: PathBuf::from(DEFAULT_ROOT_DIR),
sse_capacity_multiplier: 1,
enable_beacon_processor: true,
duplicate_block_status_code: StatusCode::ACCEPTED,
enable_light_client_server: false,
target_peers: 100,
}
}
}
@@ -1296,7 +1296,7 @@ pub fn serve<T: BeaconChainTypes>(
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block_contents),
ProvenancedBlock::local_from_publish_request(block_contents),
chain,
&network_tx,
log,
@@ -1338,7 +1338,7 @@ pub fn serve<T: BeaconChainTypes>(
})?;
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block_contents),
ProvenancedBlock::local_from_publish_request(block_contents),
chain,
&network_tx,
log,
@@ -1373,7 +1373,7 @@ pub fn serve<T: BeaconChainTypes>(
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block_contents),
ProvenancedBlock::local_from_publish_request(block_contents),
chain,
&network_tx,
log,
@@ -1417,7 +1417,7 @@ pub fn serve<T: BeaconChainTypes>(
})?;
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block_contents),
ProvenancedBlock::local_from_publish_request(block_contents),
chain,
&network_tx,
log,
@@ -2411,40 +2411,7 @@ pub fn serve<T: BeaconChainTypes>(
block_root: Hash256,
accept_header: Option<api_types::Accept>| {
task_spawner.blocking_response_task(Priority::P1, move || {
let (bootstrap, fork_name) = match chain.get_light_client_bootstrap(&block_root)
{
Ok(Some(res)) => res,
Ok(None) => {
return Err(warp_utils::reject::custom_not_found(
"Light client bootstrap unavailable".to_string(),
));
}
Err(e) => {
return Err(warp_utils::reject::custom_server_error(format!(
"Unable to obtain LightClientBootstrap instance: {e:?}"
)));
}
};
match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
.body(bootstrap.as_ssz_bytes().into())
.map(|res: Response<Body>| add_ssz_content_type_header(res))
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"failed to create response: {}",
e
))
}),
_ => Ok(warp::reply::json(&ForkVersionedResponse {
version: Some(fork_name),
metadata: EmptyMetadata {},
data: bootstrap,
})
.into_response()),
}
.map(|resp| add_consensus_version_header(resp, fork_name))
get_light_client_bootstrap::<T>(chain, &block_root, accept_header)
})
},
);
@@ -2674,7 +2641,6 @@ pub fn serve<T: BeaconChainTypes>(
);
// GET config/spec
let spec_fork_name = ctx.config.spec_fork_name;
let get_config_spec = config_path
.and(warp::path("spec"))
.and(warp::path::end())
@@ -2684,7 +2650,7 @@ pub fn serve<T: BeaconChainTypes>(
move |task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
let config_and_preset =
ConfigAndPreset::from_chain_spec::<T::EthSpec>(&chain.spec, spec_fork_name);
ConfigAndPreset::from_chain_spec::<T::EthSpec>(&chain.spec, None);
Ok(api_types::GenericResponse::from(config_and_preset))
})
},
@@ -2967,8 +2933,16 @@ pub fn serve<T: BeaconChainTypes>(
let is_optimistic = head_execution_status.is_optimistic_or_invalid();
// When determining sync status, make an exception for single-node
// testnets with 0 peers.
let sync_state = network_globals.sync_state.read();
let is_synced = sync_state.is_synced()
|| (sync_state.is_stalled()
&& network_globals.config.target_peers == 0);
drop(sync_state);
let syncing_data = api_types::SyncingData {
is_syncing: !network_globals.sync_state.read().is_synced(),
is_syncing: !is_synced,
is_optimistic,
el_offline,
head_slot,

View File

@@ -1,18 +1,20 @@
use beacon_chain::{BeaconChain, BeaconChainTypes};
use crate::version::{
add_consensus_version_header, add_ssz_content_type_header, fork_versioned_response, V1,
};
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::types::{
self as api_types, ChainSpec, ForkVersionedResponse, LightClientUpdate,
LightClientUpdateResponseChunk, LightClientUpdateSszResponse, LightClientUpdatesQuery,
};
use ssz::Encode;
use std::sync::Arc;
use types::{ForkName, Hash256, LightClientBootstrap};
use warp::{
hyper::{Body, Response},
reply::Reply,
Rejection,
};
use crate::version::{add_ssz_content_type_header, fork_versioned_response, V1};
const MAX_REQUEST_LIGHT_CLIENT_UPDATES: u64 = 128;
pub fn get_light_client_updates<T: BeaconChainTypes>(
@@ -62,6 +64,45 @@ pub fn get_light_client_updates<T: BeaconChainTypes>(
}
}
pub fn get_light_client_bootstrap<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_root: &Hash256,
accept_header: Option<api_types::Accept>,
) -> Result<Response<Body>, Rejection> {
let (light_client_bootstrap, fork_name) = chain
.get_light_client_bootstrap(block_root)
.map_err(|err| {
let error_message = if let BeaconChainError::LightClientBootstrapError(err) = err {
println!("{:?}", err);
err
} else {
"No LightClientBootstrap found".to_string()
};
warp_utils::reject::custom_not_found(error_message)
})?
.ok_or(warp_utils::reject::custom_not_found(
"No LightClientBootstrap found".to_string(),
))?;
match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
.body(light_client_bootstrap.as_ssz_bytes().into())
.map(|res: Response<Body>| add_consensus_version_header(res, fork_name))
.map(|res: Response<Body>| add_ssz_content_type_header(res))
.map_err(|e| {
warp_utils::reject::custom_server_error(format!("failed to create response: {}", e))
}),
_ => {
let fork_versioned_response = map_light_client_bootstrap_to_json_response::<T>(
fork_name,
light_client_bootstrap,
)?;
Ok(warp::reply::json(&fork_versioned_response).into_response())
}
}
}
pub fn validate_light_client_updates_request<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
query: &LightClientUpdatesQuery,
@@ -131,6 +172,13 @@ fn map_light_client_update_to_ssz_chunk<T: BeaconChainTypes>(
}
}
fn map_light_client_bootstrap_to_json_response<T: BeaconChainTypes>(
fork_name: ForkName,
light_client_bootstrap: LightClientBootstrap<T::EthSpec>,
) -> Result<ForkVersionedResponse<LightClientBootstrap<T::EthSpec>>, Rejection> {
fork_versioned_response(V1, fork_name, light_client_bootstrap)
}
fn map_light_client_update_to_json_response<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
light_client_update: LightClientUpdate<T::EthSpec>,

View File

@@ -1,13 +1,17 @@
use crate::metrics;
use beacon_chain::block_verification_types::{AsBlock, BlockContentsError};
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now};
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError,
IntoGossipVerifiedBlockContents, NotifyExecutionLayer,
build_blob_data_column_sidecars, AvailabilityProcessingStatus, BeaconChain, BeaconChainError,
BeaconChainTypes, BlockError, IntoGossipVerifiedBlock, NotifyExecutionLayer,
};
use eth2::types::{
BlobsBundle, BroadcastValidation, ErrorMessage, ExecutionPayloadAndBlobs, FullPayloadContents,
PublishBlockRequest, SignedBlockContents,
};
use eth2::types::{into_full_block_and_blobs, BroadcastValidation, ErrorMessage};
use eth2::types::{FullPayloadContents, PublishBlockRequest};
use execution_layer::ProvenancedPayload;
use lighthouse_network::{NetworkGlobals, PubsubMessage};
use network::NetworkMessage;
@@ -15,40 +19,62 @@ use rand::seq::SliceRandom;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tree_hash::TreeHash;
use types::{
AbstractExecPayload, BeaconBlockRef, BlobSidecarList, BlockImportSource, DataColumnSidecarList,
DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, FullPayload,
FullPayloadBellatrix, Hash256, RuntimeVariableList, SignedBeaconBlock,
AbstractExecPayload, BeaconBlockRef, BlobSidecar, BlobsList, BlockImportSource,
DataColumnSidecarList, DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName,
FullPayload, FullPayloadBellatrix, Hash256, KzgProofs, SignedBeaconBlock,
SignedBlindedBeaconBlock,
};
use warp::http::StatusCode;
use warp::{reply::Response, Rejection, Reply};
pub enum ProvenancedBlock<T: BeaconChainTypes, B: IntoGossipVerifiedBlockContents<T>> {
pub type UnverifiedBlobs<T> = Option<(
KzgProofs<<T as BeaconChainTypes>::EthSpec>,
BlobsList<<T as BeaconChainTypes>::EthSpec>,
)>;
pub enum ProvenancedBlock<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>> {
/// The payload was built using a local EE.
Local(B, PhantomData<T>),
Local(B, UnverifiedBlobs<T>, PhantomData<T>),
/// The payload was build using a remote builder (e.g., via a mev-boost
/// compatible relay).
Builder(B, PhantomData<T>),
Builder(B, UnverifiedBlobs<T>, PhantomData<T>),
}
impl<T: BeaconChainTypes, B: IntoGossipVerifiedBlockContents<T>> ProvenancedBlock<T, B> {
pub fn local(block: B) -> Self {
Self::Local(block, PhantomData)
impl<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>> ProvenancedBlock<T, B> {
pub fn local(block: B, blobs: UnverifiedBlobs<T>) -> Self {
Self::Local(block, blobs, PhantomData)
}
pub fn builder(block: B) -> Self {
Self::Builder(block, PhantomData)
pub fn builder(block: B, blobs: UnverifiedBlobs<T>) -> Self {
Self::Builder(block, blobs, PhantomData)
}
}
impl<T: BeaconChainTypes> ProvenancedBlock<T, Arc<SignedBeaconBlock<T::EthSpec>>> {
pub fn local_from_publish_request(request: PublishBlockRequest<T::EthSpec>) -> Self {
match request {
PublishBlockRequest::Block(block) => Self::local(block, None),
PublishBlockRequest::BlockContents(block_contents) => {
let SignedBlockContents {
signed_block,
kzg_proofs,
blobs,
} = block_contents;
Self::local(signed_block, Some((kzg_proofs, blobs)))
}
}
}
}
/// Handles a request from the HTTP API for full blocks.
#[allow(clippy::too_many_arguments)]
pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockContents<T>>(
pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
block_root: Option<Hash256>,
provenanced_block: ProvenancedBlock<T, B>,
chain: Arc<BeaconChain<T>>,
@@ -60,28 +86,29 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
) -> Result<Response, Rejection> {
let seen_timestamp = timestamp_now();
let (block_contents, is_locally_built_block) = match provenanced_block {
ProvenancedBlock::Local(block_contents, _) => (block_contents, true),
ProvenancedBlock::Builder(block_contents, _) => (block_contents, false),
let (unverified_block, unverified_blobs, is_locally_built_block) = match provenanced_block {
ProvenancedBlock::Local(block, blobs, _) => (block, blobs, true),
ProvenancedBlock::Builder(block, blobs, _) => (block, blobs, false),
};
let provenance = if is_locally_built_block {
"local"
} else {
"builder"
};
let block = block_contents.inner_block().clone();
let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock);
let block = unverified_block.inner_block();
debug!(log, "Signed block received in HTTP API"; "slot" => block.slot());
let malicious_withhold_count = chain.config.malicious_withhold_count;
let chain_cloned = chain.clone();
/* actually publish a block */
let publish_block = move |block: Arc<SignedBeaconBlock<T::EthSpec>>,
blobs_opt: Option<BlobSidecarList<T::EthSpec>>,
data_cols_opt: Option<DataColumnSidecarList<T::EthSpec>>,
sender,
log,
seen_timestamp| {
let publish_block_p2p = move |block: Arc<SignedBeaconBlock<T::EthSpec>>,
should_publish_block: bool,
blob_sidecars: Vec<Arc<BlobSidecar<T::EthSpec>>>,
mut data_column_sidecars: DataColumnSidecarList<T::EthSpec>,
sender,
log,
seen_timestamp|
-> Result<(), BlockError> {
let publish_timestamp = timestamp_now();
let publish_delay = publish_timestamp
.checked_sub(seen_timestamp)
@@ -93,55 +120,48 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
publish_delay,
);
info!(
log,
"Signed block published to network via HTTP API";
"slot" => block.slot(),
"publish_delay_ms" => publish_delay.as_millis()
);
let mut pubsub_messages = if should_publish_block {
info!(
log,
"Signed block published to network via HTTP API";
"slot" => block.slot(),
"blobs_published" => blob_sidecars.len(),
"publish_delay_ms" => publish_delay.as_millis(),
);
vec![PubsubMessage::BeaconBlock(block.clone())]
} else {
vec![]
};
match block.as_ref() {
SignedBeaconBlock::Base(_)
| SignedBeaconBlock::Altair(_)
| SignedBeaconBlock::Bellatrix(_)
| SignedBeaconBlock::Capella(_) => {
crate::publish_pubsub_message(&sender, PubsubMessage::BeaconBlock(block))
crate::publish_pubsub_messages(&sender, pubsub_messages)
.map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?;
}
SignedBeaconBlock::Deneb(_) | SignedBeaconBlock::Electra(_) => {
let mut pubsub_messages = vec![PubsubMessage::BeaconBlock(block)];
if let Some(blob_sidecars) = blobs_opt {
// Publish blob sidecars
for (blob_index, blob) in blob_sidecars.into_iter().enumerate() {
pubsub_messages.push(PubsubMessage::BlobSidecar(Box::new((
blob_index as u64,
blob,
))));
}
for blob in blob_sidecars.into_iter() {
pubsub_messages.push(PubsubMessage::BlobSidecar(Box::new((blob.index, blob))));
}
if malicious_withhold_count > 0 {
let columns_to_keep = data_column_sidecars
.len()
.saturating_sub(malicious_withhold_count);
// Randomize columns before dropping the last malicious_withhold_count items
data_column_sidecars.shuffle(&mut rand::thread_rng());
drop(data_column_sidecars.drain(columns_to_keep..));
}
if let Some(data_col_sidecars) = data_cols_opt {
let mut data_col_sidecars = data_col_sidecars.to_vec();
if malicious_withhold_count > 0 {
let columns_to_keep = data_col_sidecars
.len()
.saturating_sub(malicious_withhold_count);
// Randomize columns before dropping the last malicious_withhold_count items
data_col_sidecars.shuffle(&mut rand::thread_rng());
data_col_sidecars = data_col_sidecars
.into_iter()
.take(columns_to_keep)
.collect::<Vec<_>>();
}
for data_col in data_col_sidecars {
let subnet = DataColumnSubnetId::from_column_index::<T::EthSpec>(
data_col.index as usize,
&chain_cloned.spec,
);
pubsub_messages.push(PubsubMessage::DataColumnSidecar(Box::new((
subnet, data_col,
))));
}
for data_col in data_column_sidecars {
let subnet = DataColumnSubnetId::from_column_index::<T::EthSpec>(
data_col.index as usize,
&chain_cloned.spec,
);
pubsub_messages.push(PubsubMessage::DataColumnSidecar(Box::new((
subnet, data_col,
))));
}
crate::publish_pubsub_messages(&sender, pubsub_messages)
.map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?;
@@ -151,75 +171,162 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
};
/* only publish if gossip- and consensus-valid and equivocation-free */
let chain_clone = chain.clone();
let slot = block.message().slot();
let proposer_index = block.message().proposer_index();
let sender_clone = network_tx.clone();
let log_clone = log.clone();
/* if we can form a `GossipVerifiedBlock`, we've passed our basic gossip checks */
let (gossip_verified_block, gossip_verified_blobs, gossip_verified_data_columns) =
match block_contents.into_gossip_verified_block(&chain) {
Ok(b) => b,
Err(BlockContentsError::BlockError(BlockError::BlockIsAlreadyKnown(_)))
| Err(BlockContentsError::BlobError(
beacon_chain::blob_verification::GossipBlobError::RepeatBlob { .. },
)) => {
// Allow the status code for duplicate blocks to be overridden based on config.
return Ok(warp::reply::with_status(
warp::reply::json(&ErrorMessage {
code: duplicate_status_code.as_u16(),
message: "duplicate block".to_string(),
stacktraces: vec![],
}),
duplicate_status_code,
)
.into_response());
}
Err(e) => {
warn!(
log,
"Not publishing block - not gossip verified";
"slot" => slot,
"error" => %e
);
return Err(warp_utils::reject::custom_bad_request(e.to_string()));
}
};
// Convert blobs to either:
//
// 1. Blob sidecars if prior to peer DAS, or
// 2. Data column sidecars if post peer DAS.
let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch());
// TODO(das): We could potentially get rid of these conversions and pass `GossipVerified` types
// to `publish_block`, i.e. have `GossipVerified` types in `PubsubMessage`?
// This saves us from extra code and provides guarantee that published
// components are verified.
// Clone here, so we can take advantage of the `Arc`. The block in `BlockContents` is not,
// `Arc`'d but blobs are.
let block = gossip_verified_block.block.block_cloned();
let blobs_opt = gossip_verified_blobs.as_ref().map(|gossip_verified_blobs| {
let blobs = gossip_verified_blobs
.into_iter()
.map(|b| b.clone_blob())
.collect::<Vec<_>>();
RuntimeVariableList::from_vec(
blobs,
chain.spec.max_blobs_per_block(block.epoch()) as usize,
let (blob_sidecars, data_column_sidecars) = match unverified_blobs {
// Pre-PeerDAS: construct blob sidecars for the network.
Some((kzg_proofs, blobs)) if !peer_das_enabled => {
let blob_sidecars = kzg_proofs
.into_iter()
.zip(blobs)
.enumerate()
.map(|(i, (proof, unverified_blob))| {
let _timer = metrics::start_timer(
&beacon_chain::metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION,
);
let blob_sidecar =
BlobSidecar::new(i, unverified_blob, &block, proof).map(Arc::new);
blob_sidecar.map_err(|e| {
error!(
log,
"Invalid blob - not publishing block";
"error" => ?e,
"blob_index" => i,
"slot" => slot,
);
warp_utils::reject::custom_bad_request(format!("{e:?}"))
})
})
.collect::<Result<Vec<_>, Rejection>>()?;
(blob_sidecars, vec![])
}
// Post PeerDAS: construct data columns.
Some((_, blobs)) => {
// TODO(das): this is sub-optimal and should likely not be happening prior to gossip
// block publishing.
let data_column_sidecars = build_blob_data_column_sidecars(&chain, &block, blobs)
.map_err(|e| {
error!(
log,
"Invalid data column - not publishing block";
"error" => ?e,
"slot" => slot
);
warp_utils::reject::custom_bad_request(format!("{e:?}"))
})?;
(vec![], data_column_sidecars)
}
None => (vec![], vec![]),
};
// Gossip verify the block and blobs/data columns separately.
let gossip_verified_block_result = unverified_block.into_gossip_verified_block(&chain);
let gossip_verified_blobs = blob_sidecars
.into_iter()
.map(|blob_sidecar| {
let gossip_verified_blob =
GossipVerifiedBlob::new(blob_sidecar.clone(), blob_sidecar.index, &chain);
match gossip_verified_blob {
Ok(blob) => Ok(Some(blob)),
Err(GossipBlobError::RepeatBlob { proposer, .. }) => {
// Log the error but do not abort publication, we may need to publish the block
// or some of the other blobs if the block & blobs are only partially published
// by the other publisher.
debug!(
log,
"Blob for publication already known";
"blob_index" => blob_sidecar.index,
"slot" => slot,
"proposer" => proposer,
);
Ok(None)
}
Err(e) => {
error!(
log,
"Blob for publication is gossip-invalid";
"blob_index" => blob_sidecar.index,
"slot" => slot,
"error" => ?e,
);
Err(warp_utils::reject::custom_bad_request(e.to_string()))
}
}
})
.collect::<Result<Vec<_>, Rejection>>()?;
let gossip_verified_data_columns = data_column_sidecars
.into_iter()
.map(|data_column_sidecar| {
let column_index = data_column_sidecar.index as usize;
let subnet =
DataColumnSubnetId::from_column_index::<T::EthSpec>(column_index, &chain.spec);
let gossip_verified_column =
GossipVerifiedDataColumn::new(data_column_sidecar, subnet.into(), &chain);
match gossip_verified_column {
Ok(blob) => Ok(Some(blob)),
Err(GossipDataColumnError::PriorKnown { proposer, .. }) => {
// Log the error but do not abort publication, we may need to publish the block
// or some of the other data columns if the block & data columns are only
// partially published by the other publisher.
debug!(
log,
"Data column for publication already known";
"column_index" => column_index,
"slot" => slot,
"proposer" => proposer,
);
Ok(None)
}
Err(e) => {
error!(
log,
"Data column for publication is gossip-invalid";
"column_index" => column_index,
"slot" => slot,
"error" => ?e,
);
Err(warp_utils::reject::custom_bad_request(format!("{e:?}")))
}
}
})
.collect::<Result<Vec<_>, Rejection>>()?;
let publishable_blobs = gossip_verified_blobs
.iter()
.flatten()
.map(|b| b.clone_blob())
.collect::<Vec<_>>();
let publishable_data_columns = gossip_verified_data_columns
.iter()
.flatten()
.map(|b| b.clone_data_column())
.collect::<Vec<_>>();
let block_root = block_root.unwrap_or_else(|| {
gossip_verified_block_result.as_ref().map_or_else(
|_| block.canonical_root(),
|verified_block| verified_block.block_root,
)
});
let data_cols_opt = gossip_verified_data_columns
.as_ref()
.map(|gossip_verified_data_columns| {
gossip_verified_data_columns
.into_iter()
.map(|col| col.clone_data_column())
.collect::<Vec<_>>()
});
let block_root = block_root.unwrap_or(gossip_verified_block.block_root);
let should_publish_block = gossip_verified_block_result.is_ok();
if let BroadcastValidation::Gossip = validation_level {
publish_block(
publish_block_p2p(
block.clone(),
blobs_opt.clone(),
data_cols_opt.clone(),
should_publish_block,
publishable_blobs.clone(),
publishable_data_columns.clone(),
sender_clone.clone(),
log.clone(),
seen_timestamp,
@@ -227,65 +334,42 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
.map_err(|_| warp_utils::reject::custom_server_error("unable to publish".into()))?;
}
let block_clone = block.clone();
let publish_fn = move || match validation_level {
BroadcastValidation::Gossip => Ok(()),
BroadcastValidation::Consensus => publish_block(
block_clone,
blobs_opt,
data_cols_opt,
sender_clone,
log_clone,
seen_timestamp,
),
BroadcastValidation::ConsensusAndEquivocation => {
check_slashable(
&chain_clone,
&blobs_opt,
block_root,
&block_clone,
&log_clone,
)?;
publish_block(
block_clone,
blobs_opt,
data_cols_opt,
sender_clone,
log_clone,
let publish_fn_completed = Arc::new(AtomicBool::new(false));
let block_to_publish = block.clone();
let publish_fn = || {
match validation_level {
BroadcastValidation::Gossip => (),
BroadcastValidation::Consensus => publish_block_p2p(
block_to_publish.clone(),
should_publish_block,
publishable_blobs.clone(),
publishable_data_columns.clone(),
sender_clone.clone(),
log.clone(),
seen_timestamp,
)
}
)?,
BroadcastValidation::ConsensusAndEquivocation => {
check_slashable(&chain, block_root, &block_to_publish, &log)?;
publish_block_p2p(
block_to_publish.clone(),
should_publish_block,
publishable_blobs.clone(),
publishable_data_columns.clone(),
sender_clone.clone(),
log.clone(),
seen_timestamp,
)?;
}
};
publish_fn_completed.store(true, Ordering::SeqCst);
Ok(())
};
if let Some(gossip_verified_blobs) = gossip_verified_blobs {
for blob in gossip_verified_blobs {
if let Err(e) = Box::pin(chain.process_gossip_blob(blob)).await {
let msg = format!("Invalid blob: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
} else {
error!(
log,
"Invalid blob provided to HTTP API";
"reason" => &msg
);
Err(warp_utils::reject::custom_bad_request(msg))
};
}
}
}
if let Some(gossip_verified_data_columns) = gossip_verified_data_columns {
let custody_columns_indices = network_globals.custody_columns();
let custody_columns = gossip_verified_data_columns
.into_iter()
.filter(|data_column| custody_columns_indices.contains(&data_column.index()))
.collect();
if let Err(e) = Box::pin(chain.process_gossip_data_columns(custody_columns)).await {
let msg = format!("Invalid data column: {e}");
for blob in gossip_verified_blobs.into_iter().flatten() {
// Importing the blobs could trigger block import and network publication in the case
// where the block was already seen on gossip.
if let Err(e) = Box::pin(chain.process_gossip_blob(blob, &publish_fn)).await {
let msg = format!("Invalid blob: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
} else {
@@ -299,23 +383,149 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}
}
match Box::pin(chain.process_block(
block_root,
gossip_verified_block,
NotifyExecutionLayer::Yes,
BlockImportSource::HttpApi,
publish_fn,
))
.await
if gossip_verified_data_columns
.iter()
.map(Option::is_some)
.count()
> 0
{
Ok(AvailabilityProcessingStatus::Imported(root)) => {
let sampling_columns_indices = &network_globals.sampling_columns;
let sampling_columns = gossip_verified_data_columns
.into_iter()
.flatten()
.filter(|data_column| sampling_columns_indices.contains(&data_column.index()))
.collect();
// Importing the columns could trigger block import and network publication in the case
// where the block was already seen on gossip.
if let Err(e) =
Box::pin(chain.process_gossip_data_columns(sampling_columns, publish_fn)).await
{
let msg = format!("Invalid data column: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
} else {
error!(
log,
"Invalid data column during block publication";
"reason" => &msg
);
Err(warp_utils::reject::custom_bad_request(msg))
};
}
}
match gossip_verified_block_result {
Ok(gossip_verified_block) => {
let import_result = Box::pin(chain.process_block(
block_root,
gossip_verified_block,
NotifyExecutionLayer::Yes,
BlockImportSource::HttpApi,
publish_fn,
))
.await;
post_block_import_logging_and_response(
import_result,
validation_level,
block,
is_locally_built_block,
seen_timestamp,
&chain,
&log,
)
.await
}
Err(BlockError::DuplicateFullyImported(root)) => {
if publish_fn_completed.load(Ordering::SeqCst) {
post_block_import_logging_and_response(
Ok(AvailabilityProcessingStatus::Imported(root)),
validation_level,
block,
is_locally_built_block,
seen_timestamp,
&chain,
&log,
)
.await
} else {
// None of the components provided in this HTTP request were new, so this was an
// entirely redundant duplicate request. Return a status code indicating this,
// which can be overridden based on config.
Ok(warp::reply::with_status(
warp::reply::json(&ErrorMessage {
code: duplicate_status_code.as_u16(),
message: "duplicate block".to_string(),
stacktraces: vec![],
}),
duplicate_status_code,
)
.into_response())
}
}
Err(BlockError::DuplicateImportStatusUnknown(root)) => {
debug!(
log,
"Block previously seen";
"block_root" => ?root,
"slot" => block.slot(),
);
let import_result = Box::pin(chain.process_block(
block_root,
block.clone(),
NotifyExecutionLayer::Yes,
BlockImportSource::HttpApi,
publish_fn,
))
.await;
post_block_import_logging_and_response(
import_result,
validation_level,
block,
is_locally_built_block,
seen_timestamp,
&chain,
&log,
)
.await
}
Err(e) => {
warn!(
log,
"Not publishing block - not gossip verified";
"slot" => slot,
"error" => %e
);
Err(warp_utils::reject::custom_bad_request(e.to_string()))
}
}
}
async fn post_block_import_logging_and_response<T: BeaconChainTypes>(
result: Result<AvailabilityProcessingStatus, BlockError>,
validation_level: BroadcastValidation,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
is_locally_built_block: bool,
seen_timestamp: Duration,
chain: &Arc<BeaconChain<T>>,
log: &Logger,
) -> Result<Response, Rejection> {
match result {
// The `DuplicateFullyImported` case here captures the case where the block finishes
// being imported after gossip verification. It could be that it finished imported as a
// result of the block being imported from gossip, OR it could be that it finished importing
// after processing of a gossip blob. In the latter case we MUST run fork choice to
// re-compute the head.
Ok(AvailabilityProcessingStatus::Imported(root))
| Err(BlockError::DuplicateFullyImported(root)) => {
let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock);
info!(
log,
"Valid block from HTTP API";
"block_delay" => ?delay,
"root" => format!("{}", root),
"proposer_index" => proposer_index,
"slot" =>slot,
"root" => %root,
"proposer_index" => block.message().proposer_index(),
"slot" => block.slot(),
);
// Notify the validator monitor.
@@ -334,7 +544,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
// blocks built with builders we consider the broadcast time to be
// when the blinded block is published to the builder.
if is_locally_built_block {
late_block_logging(&chain, seen_timestamp, block.message(), root, "local", &log)
late_block_logging(chain, seen_timestamp, block.message(), root, "local", log)
}
Ok(warp::reply().into_response())
}
@@ -363,11 +573,10 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(format!("{e}")))
} else {
let msg = format!("{:?}", e);
error!(
log,
"Invalid block provided to HTTP API";
"reason" => &msg
"reason" => ?e,
);
Err(warp_utils::reject::custom_bad_request(format!(
"Invalid block: {e}"
@@ -389,7 +598,7 @@ pub async fn publish_blinded_block<T: BeaconChainTypes>(
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
) -> Result<Response, Rejection> {
let block_root = blinded_block.canonical_root();
let full_block: ProvenancedBlock<T, PublishBlockRequest<T::EthSpec>> =
let full_block =
reconstruct_block(chain.clone(), block_root, blinded_block, log.clone()).await?;
publish_block::<T, _>(
Some(block_root),
@@ -412,7 +621,7 @@ pub async fn reconstruct_block<T: BeaconChainTypes>(
block_root: Hash256,
block: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
log: Logger,
) -> Result<ProvenancedBlock<T, PublishBlockRequest<T::EthSpec>>, Rejection> {
) -> Result<ProvenancedBlock<T, Arc<SignedBeaconBlock<T::EthSpec>>>, Rejection> {
let full_payload_opt = if let Ok(payload_header) = block.message().body().execution_payload() {
let el = chain.execution_layer.as_ref().ok_or_else(|| {
warp_utils::reject::custom_server_error("Missing execution layer".to_string())
@@ -478,14 +687,17 @@ pub async fn reconstruct_block<T: BeaconChainTypes>(
match full_payload_opt {
// A block without a payload is pre-merge and we consider it locally
// built.
None => into_full_block_and_blobs(block, None).map(ProvenancedBlock::local),
None => block
.try_into_full_block(None)
.ok_or("Failed to build full block with payload".to_string())
.map(|full_block| ProvenancedBlock::local(Arc::new(full_block), None)),
Some(ProvenancedPayload::Local(full_payload_contents)) => {
into_full_block_and_blobs(block, Some(full_payload_contents))
.map(ProvenancedBlock::local)
into_full_block_and_blobs::<T>(block, full_payload_contents)
.map(|(block, blobs)| ProvenancedBlock::local(block, blobs))
}
Some(ProvenancedPayload::Builder(full_payload_contents)) => {
into_full_block_and_blobs(block, Some(full_payload_contents))
.map(ProvenancedBlock::builder)
into_full_block_and_blobs::<T>(block, full_payload_contents)
.map(|(block, blobs)| ProvenancedBlock::builder(block, blobs))
}
}
.map_err(|e| {
@@ -544,28 +756,11 @@ fn late_block_logging<T: BeaconChainTypes, P: AbstractExecPayload<T::EthSpec>>(
/// Check if any of the blobs or the block are slashable. Returns `BlockError::Slashable` if so.
fn check_slashable<T: BeaconChainTypes>(
chain_clone: &BeaconChain<T>,
blobs_opt: &Option<BlobSidecarList<T::EthSpec>>,
block_root: Hash256,
block_clone: &SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>,
log_clone: &Logger,
) -> Result<(), BlockError> {
let slashable_cache = chain_clone.observed_slashable.read();
if let Some(blobs) = blobs_opt.as_ref() {
blobs.iter().try_for_each(|blob| {
if slashable_cache
.is_slashable(blob.slot(), blob.block_proposer_index(), blob.block_root())
.map_err(|e| BlockError::BeaconChainError(e.into()))?
{
warn!(
log_clone,
"Not publishing equivocating blob";
"slot" => block_clone.slot()
);
return Err(BlockError::Slashable);
}
Ok(())
})?;
};
if slashable_cache
.is_slashable(
block_clone.slot(),
@@ -583,3 +778,38 @@ fn check_slashable<T: BeaconChainTypes>(
}
Ok(())
}
/// Converting from a `SignedBlindedBeaconBlock` into a full `SignedBlockContents`.
#[allow(clippy::type_complexity)]
pub fn into_full_block_and_blobs<T: BeaconChainTypes>(
blinded_block: SignedBlindedBeaconBlock<T::EthSpec>,
maybe_full_payload_contents: FullPayloadContents<T::EthSpec>,
) -> Result<(Arc<SignedBeaconBlock<T::EthSpec>>, UnverifiedBlobs<T>), String> {
match maybe_full_payload_contents {
// This variant implies a pre-deneb block
FullPayloadContents::Payload(execution_payload) => {
let signed_block = blinded_block
.try_into_full_block(Some(execution_payload))
.ok_or("Failed to build full block with payload".to_string())?;
Ok((Arc::new(signed_block), None))
}
// This variant implies a post-deneb block
FullPayloadContents::PayloadAndBlobs(payload_and_blobs) => {
let ExecutionPayloadAndBlobs {
execution_payload,
blobs_bundle,
} = payload_and_blobs;
let signed_block = blinded_block
.try_into_full_block(Some(execution_payload))
.ok_or("Failed to build full block with payload".to_string())?;
let BlobsBundle {
commitments: _,
proofs,
blobs,
} = blobs_bundle;
Ok((Arc::new(signed_block), Some((proofs, blobs))))
}
}
}

View File

@@ -15,12 +15,10 @@ pub fn compute_beacon_block_rewards<T: BeaconChainTypes>(
let block_ref = block.message();
let block_root = block.canonical_root();
let mut state = get_state_before_applying_block(chain.clone(), &block)?;
let rewards = chain
.compute_beacon_block_reward(block_ref, block_root, &mut state)
.compute_beacon_block_reward(block_ref, &mut state)
.map_err(beacon_chain_error)?;
Ok((rewards, execution_optimistic, finalized))

View File

@@ -16,7 +16,7 @@ use lighthouse_network::{
},
rpc::methods::{MetaData, MetaDataV2},
types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield, SyncState},
ConnectedPoint, Enr, NetworkGlobals, PeerId, PeerManager,
ConnectedPoint, Enr, NetworkConfig, NetworkGlobals, PeerId, PeerManager,
};
use logging::test_logger;
use network::{NetworkReceivers, NetworkSenders};
@@ -61,7 +61,8 @@ type Mutator<E> = BoxedMutator<E, MemoryStore<E>, MemoryStore<E>>;
impl<E: EthSpec> InteractiveTester<E> {
pub async fn new(spec: Option<ChainSpec>, validator_count: usize) -> Self {
Self::new_with_initializer_and_mutator(spec, validator_count, None, None).await
Self::new_with_initializer_and_mutator(spec, validator_count, None, None, Config::default())
.await
}
pub async fn new_with_initializer_and_mutator(
@@ -69,9 +70,10 @@ impl<E: EthSpec> InteractiveTester<E> {
validator_count: usize,
initializer: Option<Initializer<E>>,
mutator: Option<Mutator<E>>,
config: Config,
) -> Self {
let mut harness_builder = BeaconChainHarness::builder(E::default())
.spec_or_default(spec)
.spec_or_default(spec.map(Arc::new))
.logger(test_logger())
.mock_execution_layer();
@@ -99,8 +101,9 @@ impl<E: EthSpec> InteractiveTester<E> {
listening_socket,
network_rx,
..
} = create_api_server(
} = create_api_server_with_config(
harness.chain.clone(),
config,
&harness.runtime,
harness.logger().clone(),
)
@@ -131,6 +134,15 @@ pub async fn create_api_server<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
test_runtime: &TestRuntime,
log: Logger,
) -> ApiServer<T, impl Future<Output = ()>> {
create_api_server_with_config(chain, Config::default(), test_runtime, log).await
}
pub async fn create_api_server_with_config<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
http_config: Config,
test_runtime: &TestRuntime,
log: Logger,
) -> ApiServer<T, impl Future<Output = ()>> {
// Use port 0 to allocate a new unused port.
let port = 0;
@@ -145,12 +157,14 @@ pub async fn create_api_server<T: BeaconChainTypes>(
});
let enr_key = CombinedKey::generate_secp256k1();
let enr = Enr::builder().build(&enr_key).unwrap();
let network_config = Arc::new(NetworkConfig::default());
let network_globals = Arc::new(NetworkGlobals::new(
enr.clone(),
meta_data,
vec![],
false,
&log,
network_config,
chain.spec.clone(),
));
@@ -218,12 +232,14 @@ pub async fn create_api_server<T: BeaconChainTypes>(
.unwrap();
let ctx = Arc::new(Context {
// Override several config fields with defaults. If these need to be tweaked in future
// we could remove these overrides.
config: Config {
enabled: true,
listen_port: port,
data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR),
enable_light_client_server: true,
..Config::default()
..http_config
},
chain: Some(chain),
network_senders: Some(network_senders),