Merge unstable 20230911 into deneb-free-blobs.

This commit is contained in:
Jimmy Chen
2023-09-11 11:59:13 +10:00
71 changed files with 2298 additions and 956 deletions

View File

@@ -39,7 +39,8 @@ use bytes::Bytes;
use directory::DEFAULT_ROOT_DIR;
use eth2::types::{
self as api_types, BroadcastValidation, EndpointVersion, ForkChoice, ForkChoiceNode,
SignedBlockContents, SkipRandaoVerification, ValidatorId, ValidatorStatus,
SignedBlindedBlockContents, SignedBlockContents, SkipRandaoVerification, ValidatorId,
ValidatorStatus,
};
use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
use lighthouse_version::version_with_platform;
@@ -139,6 +140,8 @@ pub struct Config {
pub data_dir: PathBuf,
pub sse_capacity_multiplier: usize,
pub enable_beacon_processor: bool,
#[serde(with = "eth2::types::serde_status_code")]
pub duplicate_block_status_code: StatusCode,
}
impl Default for Config {
@@ -154,6 +157,7 @@ impl Default for Config {
data_dir: PathBuf::from(DEFAULT_ROOT_DIR),
sse_capacity_multiplier: 1,
enable_beacon_processor: true,
duplicate_block_status_code: StatusCode::ACCEPTED,
}
}
}
@@ -510,6 +514,8 @@ pub fn serve<T: BeaconChainTypes>(
let task_spawner_filter =
warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone()));
let duplicate_block_status_code = ctx.config.duplicate_block_status_code;
/*
*
* Start of HTTP method definitions.
@@ -1284,11 +1290,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|block_contents: SignedBlockContents<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |block_contents: SignedBlockContents<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_block(
None,
@@ -1297,9 +1303,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
BroadcastValidation::default(),
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
@@ -1314,11 +1320,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block_contents = SignedBlockContents::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
@@ -1334,9 +1340,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
BroadcastValidation::default(),
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
@@ -1352,12 +1358,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block_contents: SignedBlockContents<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |validation_level: api_types::BroadcastValidationQuery,
block_contents: SignedBlockContents<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_block(
None,
@@ -1366,9 +1372,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
@@ -1384,12 +1390,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block_contents = SignedBlockContents::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
@@ -1405,9 +1411,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
@@ -1427,11 +1433,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|block_contents: SignedBlockContents<T::EthSpec, BlindedPayload<_>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |block_contents: SignedBlindedBlockContents<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_blinded_block(
block_contents,
@@ -1439,9 +1445,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
BroadcastValidation::default(),
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
@@ -1457,11 +1463,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block =
SignedBlockContents::<T::EthSpec, BlindedPayload<_>>::from_ssz_bytes(
@@ -1477,9 +1483,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
BroadcastValidation::default(),
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
@@ -1495,32 +1501,22 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block_contents: SignedBlockContents<T::EthSpec, BlindedPayload<_>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async(Priority::P0, async move {
match publish_blocks::publish_blinded_block(
move |validation_level: api_types::BroadcastValidationQuery,
block_contents: SignedBlindedBlockContents<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_blinded_block(
block_contents,
chain,
&network_tx,
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
)
.await
{
Ok(()) => warp::reply().into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
StatusCode::INTERNAL_SERVER_ERROR,
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
})
},
);
@@ -1531,48 +1527,36 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::bytes())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| async move {
let block =
match SignedBlockContents::<T::EthSpec, BlindedPayload<_>>::from_ssz_bytes(
&block_bytes,
&chain.spec,
) {
Ok(data) => data,
Err(_) => {
return warp::reply::with_status(
StatusCode::BAD_REQUEST,
eth2::StatusCode::BAD_REQUEST,
)
.into_response();
}
};
match publish_blocks::publish_blinded_block(
block,
chain,
&network_tx,
log,
validation_level.broadcast_validation,
)
.await
{
Ok(()) => warp::reply().into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
StatusCode::INTERNAL_SERVER_ERROR,
eth2::StatusCode::INTERNAL_SERVER_ERROR,
move |validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block =
SignedBlockContents::<T::EthSpec, BlindedPayload<_>>::from_ssz_bytes(
&block_bytes,
&chain.spec,
)
.into_response(),
},
}
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
})?;
publish_blocks::publish_blinded_block(
block,
chain,
&network_tx,
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
)
.await
})
},
);