Merge branch 'deneb-free-blobs' of https://github.com/sigp/lighthouse into refactor-deneb-networking

This commit is contained in:
realbigsean
2023-07-25 14:47:50 -04:00
12 changed files with 123 additions and 58 deletions

View File

@@ -16,12 +16,28 @@ use types::{
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};
/// A block that has been received over RPC. It has 2 internal variants:
///
/// 1. `BlockAndBlobs`: A fully available post deneb block with all the blobs available. This variant
/// is only constructed after making consistency checks between blocks and blobs.
/// Hence, it is fully self contained w.r.t verification. i.e. this block has all the required
/// data to get verfied and imported into fork choice.
///
/// 2. `Block`: This can be a fully available pre-deneb block **or** a post-deneb block that may or may
/// not require blobs to be considered fully available.
///
/// Note: We make a distinction over blocks received over gossip because
/// in a post-deneb world, the blobs corresponding to a given block that are received
/// over rpc do not contain the proposer signature for dos resistance.
#[derive(Debug, Clone, Derivative)]
#[derivative(Hash(bound = "E: EthSpec"))]
pub struct RpcBlock<E: EthSpec> {
block: RpcBlockInner<E>,
}
/// Note: This variant is intentionally private because we want to safely construct the
/// internal variants after applying consistency checks to ensure that the block and blobs
/// are consistent with respect to each other.
#[derive(Debug, Clone, Derivative)]
#[derivative(Hash(bound = "E: EthSpec"))]
enum RpcBlockInner<E: EthSpec> {
@@ -33,12 +49,15 @@ enum RpcBlockInner<E: EthSpec> {
}
impl<E: EthSpec> RpcBlock<E> {
/// Constructs a `Block` variant.
pub fn new_without_blobs(block: Arc<SignedBeaconBlock<E>>) -> Self {
Self {
block: RpcBlockInner::Block(block),
}
}
/// Constructs a new `BlockAndBlobs` variant after making consistency
/// checks between the provided blocks and blobs.
pub fn new(
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
@@ -79,6 +98,16 @@ impl<E: EthSpec> From<SignedBeaconBlock<E>> for RpcBlock<E> {
}
}
/// A block that has gone through all pre-deneb block processing checks including block processing
/// and execution by an EL client. This block hasn't completed data availability checks.
///
///
/// It contains 2 variants:
/// 1. `Available`: This block has been executed and also contains all data to consider it a
/// fully available block. i.e. for post-deneb, this implies that this contains all the
/// required blobs.
/// 2. `AvailabilityPending`: This block hasn't received all required blobs to consider it a
/// fully available block.
pub enum ExecutedBlock<E: EthSpec> {
Available(AvailableExecutedBlock<E>),
AvailabilityPending(AvailabilityPendingExecutedBlock<E>),
@@ -116,6 +145,8 @@ impl<E: EthSpec> ExecutedBlock<E> {
}
}
/// A block that has completed all pre-deneb block processing checks including verification
/// by an EL client **and** has all requisite blob data to be imported into fork choice.
#[derive(PartialEq)]
pub struct AvailableExecutedBlock<E: EthSpec> {
pub block: AvailableBlock<E>,
@@ -154,6 +185,9 @@ impl<E: EthSpec> AvailableExecutedBlock<E> {
}
}
/// A block that has completed all pre-deneb block processing checks, verification
/// by an EL client but does not have all requisite blob data to get imported into
/// fork choice.
#[derive(Encode, Decode, Clone)]
pub struct AvailabilityPendingExecutedBlock<E: EthSpec> {
#[ssz(with = "ssz_tagged_signed_beacon_block_arc")]

View File

@@ -12,6 +12,8 @@ use slog::{debug, error};
use slot_clock::SlotClock;
use ssz_types::{Error, VariableList};
use std::collections::HashSet;
use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;
use strum::IntoStaticStr;
use task_executor::TaskExecutor;
@@ -101,6 +103,17 @@ pub enum Availability<T: EthSpec> {
Available(Box<AvailableExecutedBlock<T>>),
}
impl<T: EthSpec> Debug for Availability<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::MissingComponents(block_root) => {
write!(f, "MissingComponents({})", block_root)
}
Self::Available(block) => write!(f, "Available({:?})", block.import_data.block_root),
}
}
}
impl<T: EthSpec> Availability<T> {
/// Returns all the blob identifiers associated with an `AvailableBlock`.
/// Returns `None` if avaiability hasn't been fully satisfied yet.
@@ -325,6 +338,13 @@ pub fn make_available<T: EthSpec>(
})
}
/// Makes the following checks to ensure that the list of blobs correspond block:
///
/// * Check that a block is post-deneb
/// * Checks that the number of blobs is equal to the length of kzg commitments in the list
/// * Checks that the index, slot, root and kzg_commitment in the block match the blobs in the correct order
///
/// Returns `Ok(())` if all consistency checks pass and an error otherwise.
pub fn consistency_checks<T: EthSpec>(
block: &SignedBeaconBlock<T>,
blobs: &[Arc<BlobSidecar<T>>],

View File

@@ -855,6 +855,7 @@ mod test {
validate_blob_sidecar_for_gossip, verify_kzg_for_blob, GossipVerifiedBlob,
},
block_verification::PayloadVerificationOutcome,
block_verification_types::BlockImportData,
eth1_finalization_cache::Eth1FinalizationData,
test_utils::{BaseHarnessType, BeaconChainHarness, DiskHarnessType},
};
@@ -1297,7 +1298,7 @@ mod test {
// we need blocks with blobs
continue;
}
let root = pending_block.block.block.canonical_root();
let root = pending_block.block.canonical_root();
pending_blocks.push_back(pending_block);
pending_blobs.push_back(blobs);
roots.push_back(root);

View File

@@ -1873,18 +1873,31 @@ where
(deposits, state)
}
pub async fn process_block<B: Into<RpcBlock<E>>>(
pub async fn process_block(
&self,
slot: Slot,
block_root: Hash256,
block: B,
block_contents: BlockContentsTuple<E, FullPayload<E>>,
) -> Result<SignedBeaconBlockHash, BlockError<E>> {
self.set_current_slot(slot);
let (block, blobs) = block_contents;
// Note: we are just dropping signatures here and skipping signature verification.
let blobs_without_signatures = blobs.as_ref().map(|blobs| {
VariableList::from(
blobs
.into_iter()
.map(|blob| blob.message.clone())
.collect::<Vec<_>>(),
)
});
let block_hash: SignedBeaconBlockHash = self
.chain
.process_block(block_root, block.into(), NotifyExecutionLayer::Yes, || {
Ok(())
})
.process_block(
block_root,
RpcBlock::new(Arc::new(block), blobs_without_signatures).unwrap(),
NotifyExecutionLayer::Yes,
|| Ok(()),
)
.await?
.try_into()
.unwrap();
@@ -1892,16 +1905,25 @@ where
Ok(block_hash)
}
pub async fn process_block_result<B: Into<RpcBlock<E>>>(
pub async fn process_block_result(
&self,
block: B,
block_contents: BlockContentsTuple<E, FullPayload<E>>,
) -> Result<SignedBeaconBlockHash, BlockError<E>> {
let wrapped_block = block.into();
let (block, blobs) = block_contents;
// Note: we are just dropping signatures here and skipping signature verification.
let blobs_without_signatures = blobs.as_ref().map(|blobs| {
VariableList::from(
blobs
.into_iter()
.map(|blob| blob.message.clone())
.collect::<Vec<_>>(),
)
});
let block_hash: SignedBeaconBlockHash = self
.chain
.process_block(
wrapped_block.canonical_root(),
wrapped_block,
block.canonical_root(),
RpcBlock::new(Arc::new(block), blobs_without_signatures).unwrap(),
NotifyExecutionLayer::Yes,
|| Ok(()),
)
@@ -1976,24 +1998,16 @@ where
BlockError<E>,
> {
self.set_current_slot(slot);
let ((block, blobs), new_state) = self.make_block(state, slot).await;
// Note: we are just dropping signatures here and skipping signature verification.
let blobs_without_signatures = blobs.as_ref().map(|blobs| {
VariableList::from(
blobs
.into_iter()
.map(|blob| blob.message.clone())
.collect::<Vec<_>>(),
)
});
let (block_contents, new_state) = self.make_block(state, slot).await;
let block_hash = self
.process_block(
slot,
block.canonical_root(),
RpcBlock::new(Arc::new(block.clone()), blobs_without_signatures.clone()).unwrap(),
block_contents.0.canonical_root(),
block_contents.clone(),
)
.await?;
Ok((block_hash, (block, blobs), new_state))
Ok((block_hash, block_contents, new_state))
}
pub fn attest_block(

View File

@@ -221,7 +221,7 @@ impl InvalidPayloadRig {
let head = self.harness.chain.head_snapshot();
let state = head.beacon_state.clone_with_only_committee_caches();
let slot = slot_override.unwrap_or(state.slot() + 1);
let ((block, _), post_state) = self.harness.make_block(state, slot).await;
let ((block, blobs), post_state) = self.harness.make_block(state, slot).await;
let block_root = block.canonical_root();
let set_new_payload = |payload: Payload| match payload {
@@ -285,7 +285,7 @@ impl InvalidPayloadRig {
}
let root = self
.harness
.process_block(slot, block.canonical_root(), block.clone())
.process_block(slot, block.canonical_root(), (block.clone(), blobs.clone()))
.await
.unwrap();
@@ -326,7 +326,7 @@ impl InvalidPayloadRig {
match self
.harness
.process_block(slot, block.canonical_root(), block)
.process_block(slot, block.canonical_root(), (block, blobs))
.await
{
Err(error) if evaluate_error(&error) => (),

View File

@@ -2039,7 +2039,10 @@ async fn garbage_collect_temp_states_from_failed_block() {
// The block should be rejected, but should store a bunch of temporary states.
harness.set_current_slot(block_slot);
harness.process_block_result(block).await.unwrap_err();
harness
.process_block_result((block, None))
.await
.unwrap_err();
assert_eq!(
store.iter_temporary_state_roots().count(),
@@ -2456,14 +2459,14 @@ async fn revert_minority_fork_on_resume() {
harness1.process_attestations(attestations.clone());
harness2.process_attestations(attestations);
let ((block, _), new_state) = harness1.make_block(state, slot).await;
let ((block, blobs), new_state) = harness1.make_block(state, slot).await;
harness1
.process_block(slot, block.canonical_root(), block.clone())
.process_block(slot, block.canonical_root(), (block.clone(), blobs.clone()))
.await
.unwrap();
harness2
.process_block(slot, block.canonical_root(), block.clone())
.process_block(slot, block.canonical_root(), (block.clone(), blobs.clone()))
.await
.unwrap();
@@ -2497,17 +2500,17 @@ async fn revert_minority_fork_on_resume() {
harness2.process_attestations(attestations);
// Minority chain block (no attesters).
let ((block1, _), new_state1) = harness1.make_block(state1, slot).await;
let ((block1, blobs1), new_state1) = harness1.make_block(state1, slot).await;
harness1
.process_block(slot, block1.canonical_root(), block1)
.process_block(slot, block1.canonical_root(), (block1, blobs1))
.await
.unwrap();
state1 = new_state1;
// Majority chain block (all attesters).
let ((block2, _), new_state2) = harness2.make_block(state2, slot).await;
let ((block2, blobs2), new_state2) = harness2.make_block(state2, slot).await;
harness2
.process_block(slot, block2.canonical_root(), block2.clone())
.process_block(slot, block2.canonical_root(), (block2.clone(), blobs2))
.await
.unwrap();
@@ -2560,7 +2563,7 @@ async fn revert_minority_fork_on_resume() {
let initial_split_slot = resumed_harness.chain.store.get_split_slot();
for block in &majority_blocks {
resumed_harness
.process_block_result(block.clone())
.process_block_result((block.clone(), None))
.await
.unwrap();

View File

@@ -636,7 +636,7 @@ pub async fn proposer_boost_re_org_test(
// Applying block C should cause it to become head regardless (re-org or continuation).
let block_root_c = harness
.process_block_result(block_c.clone())
.process_block_result((block_c.clone(), None))
.await
.unwrap()
.into();

View File

@@ -124,7 +124,7 @@ async fn el_error_on_new_payload() {
// Attempt to process the block, which should error.
harness.advance_slot();
assert!(matches!(
harness.process_block_result(block.clone()).await,
harness.process_block_result((block.clone(), None)).await,
Err(BlockError::ExecutionPayloadError(_))
));
@@ -143,7 +143,7 @@ async fn el_error_on_new_payload() {
validation_error: None,
},
);
harness.process_block_result(block).await.unwrap();
harness.process_block_result((block, None)).await.unwrap();
let api_response = tester.client.get_node_syncing().await.unwrap().data;
assert_eq!(api_response.el_offline, Some(false));

View File

@@ -5,9 +5,7 @@ use super::BatchProcessResult;
use super::{manager::BlockProcessType, network_context::SyncNetworkContext};
use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::block_lookups::parent_lookup::ParentLookup;
use crate::sync::block_lookups::single_block_lookup::LookupVerifyError;
use crate::sync::manager::{Id, ResponseType};
use crate::sync::block_lookups::single_block_lookup::LookupId;
use beacon_chain::block_verification_types::{AsBlock, RpcBlock};
use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};

View File

@@ -6,7 +6,7 @@ use crate::sync::block_lookups::{
};
use crate::sync::manager::BlockProcessType;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::block_verification_types::{AsBlock, RpcBlock};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_availability_checker::DataAvailabilityChecker;
use beacon_chain::{get_block_root, BeaconChainTypes};
use itertools::Itertools;

View File

@@ -7,7 +7,7 @@ use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::LookupType;
use crate::sync::block_lookups::{BlobRequestId, BlockRequestId};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
use fnv::FnvHashMap;

View File

@@ -1,10 +1,5 @@
#![cfg(not(debug_assertions))]
use std::fmt;
use std::sync::Mutex;
use std::time::Duration;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::test_utils::{
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
};
@@ -15,6 +10,9 @@ use beacon_chain::{
use fork_choice::{
ForkChoiceStore, InvalidAttestation, InvalidBlock, PayloadVerificationStatus, QueuedAttestation,
};
use std::fmt;
use std::sync::Mutex;
use std::time::Duration;
use store::MemoryStore;
use types::{
test_utils::generate_deterministic_keypair, BeaconBlockRef, BeaconState, ChainSpec, Checkpoint,
@@ -196,21 +194,18 @@ impl ForkChoiceTest {
let validators = self.harness.get_all_validators();
loop {
let slot = self.harness.get_current_slot();
let (block, state_) = self.harness.make_block(state, slot).await;
let (block_contents, state_) = self.harness.make_block(state, slot).await;
state = state_;
if !predicate(block.0.message(), &state) {
if !predicate(block_contents.0.message(), &state) {
break;
}
if let Ok(block_hash) = self
.harness
.process_block_result(RpcBlock::new(block.0, block.1).unwrap())
.await
{
let block = block_contents.0.clone();
if let Ok(block_hash) = self.harness.process_block_result(block_contents).await {
self.harness.attest_block(
&state,
block.0.state_root(),
block.state_root(),
block_hash,
&block.0,
&block,
&validators,
);
self.harness.advance_slot();