diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index a7e7792fe4..6761a59ea8 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -129,10 +129,6 @@ jobs: - uses: actions/checkout@v3 - name: Get latest version of stable Rust run: rustup update stable - - name: Install Protoc - uses: arduino/setup-protoc@e52d9eb8f7b63115df1ac544a1376fdbf5a39612 - with: - repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Run network tests for all known forks run: make test-network slasher-tests: @@ -333,6 +329,7 @@ jobs: - name: Run cargo audit to identify known security vulnerabilities reported to the RustSec Advisory Database run: make audit # TODO(sean): re-enable this when we can figure it out with c-kzg +# Issue: https://github.com/sigp/lighthouse/issues/4440 # cargo-vendor: # name: cargo-vendor # runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index b1369d21f1..5cfa03a9de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2251,6 +2251,7 @@ dependencies = [ "ssz_types", "store", "tokio", + "tree_hash", "types", ] @@ -5237,6 +5238,7 @@ dependencies = [ "derivative", "environment", "error-chain", + "eth2", "ethereum-types 0.14.1", "ethereum_ssz", "execution_layer", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e788ac8154..10a9ce2a4c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -118,10 +118,10 @@ use store::{ use task_executor::{ShutdownReason, TaskExecutor}; use tokio_stream::Stream; use tree_hash::TreeHash; -use types::beacon_block_body::{from_block_kzg_commitments, to_block_kzg_commitments}; +use types::beacon_block_body::from_block_kzg_commitments; use types::beacon_state::CloneConfig; -use types::blob_sidecar::{BlobItems, BlobSidecarList, FixedBlobSidecarList}; -use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS; +use types::blob_sidecar::{BlobSidecarList, FixedBlobSidecarList}; +use types::sidecar::BlobItems; use types::*; pub type ForkChoiceError = fork_choice::Error; @@ -186,7 +186,7 @@ pub enum WhenSlotSkipped { Prev, } -#[derive(Debug, PartialEq)] +#[derive(Copy, Clone, Debug, PartialEq)] pub enum AvailabilityProcessingStatus { MissingComponents(Slot, Hash256), Imported(Hash256), @@ -1175,17 +1175,8 @@ impl BeaconChain { /// Returns the blobs at the given root, if any. /// - /// Returns `Ok(None)` if the blobs and associated block are not found. - /// - /// If we can find the corresponding block in our database, we know whether we *should* have - /// blobs. If we should have blobs and no blobs are found, this will error. If we shouldn't, - /// this will reconstruct an empty `BlobsSidecar`. - /// /// ## Errors - /// - any database read errors - /// - block and blobs are inconsistent in the database - /// - this method is called with a pre-deneb block root - /// - this method is called for a blob that is beyond the prune depth + /// May return a database error. pub fn get_blobs(&self, block_root: &Hash256) -> Result, Error> { match self.store.get_blobs(block_root)? { Some(blobs) => Ok(blobs), @@ -2017,7 +2008,14 @@ impl BeaconChain { blob_sidecar: SignedBlobSidecar, subnet_id: u64, ) -> Result, GossipBlobError> { - blob_verification::validate_blob_sidecar_for_gossip(blob_sidecar, subnet_id, self) + metrics::inc_counter(&metrics::BLOBS_SIDECAR_PROCESSING_REQUESTS); + let _timer = metrics::start_timer(&metrics::BLOBS_SIDECAR_GOSSIP_VERIFICATION_TIMES); + blob_verification::validate_blob_sidecar_for_gossip(blob_sidecar, subnet_id, self).map( + |v| { + metrics::inc_counter(&metrics::BLOBS_SIDECAR_PROCESSING_SUCCESSES); + v + }, + ) } /// Accepts some 'LightClientOptimisticUpdate' from the network and attempts to verify it @@ -2798,9 +2796,6 @@ impl BeaconChain { /// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and /// imported into the chain. /// - /// For post deneb blocks, this returns a `BlockError::AvailabilityPending` error - /// if the corresponding blobs are not in the required caches. - /// /// Items that implement `IntoExecutionPendingBlock` include: /// /// - `SignedBeaconBlock` @@ -2824,26 +2819,80 @@ impl BeaconChain { // Increment the Prometheus counter for block processing requests. metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS); + let block_slot = unverified_block.block().slot(); + + // A small closure to group the verification and import errors. let chain = self.clone(); + let import_block = async move { + let execution_pending = unverified_block.into_execution_pending_block( + block_root, + &chain, + notify_execution_layer, + )?; + publish_fn()?; + let executed_block = chain.into_executed_block(execution_pending).await?; + match executed_block { + ExecutedBlock::Available(block) => { + self.import_available_block(Box::new(block)).await + } + ExecutedBlock::AvailabilityPending(block) => { + self.check_block_availability_and_import(block).await + } + } + }; - let execution_pending = unverified_block.into_execution_pending_block( - block_root, - &chain, - notify_execution_layer, - )?; + // Verify and import the block. + match import_block.await { + // The block was successfully verified and imported. Yay. + Ok(status @ AvailabilityProcessingStatus::Imported(block_root)) => { + trace!( + self.log, + "Beacon block imported"; + "block_root" => ?block_root, + "block_slot" => block_slot, + ); - publish_fn()?; + // Increment the Prometheus counter for block processing successes. + metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); - let executed_block = self - .clone() - .into_executed_block(execution_pending) - .await - .map_err(|e| self.handle_block_error(e))?; + Ok(status) + } + Ok(status @ AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { + trace!( + self.log, + "Beacon block awaiting blobs"; + "block_root" => ?block_root, + "block_slot" => slot, + ); - match executed_block { - ExecutedBlock::Available(block) => self.import_available_block(Box::new(block)).await, - ExecutedBlock::AvailabilityPending(block) => { - self.check_block_availability_and_import(block).await + Ok(status) + } + Err(e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_))) => { + debug!( + self.log, + "Beacon block processing cancelled"; + "error" => ?e, + ); + Err(e) + } + // There was an error whilst attempting to verify and import the block. The block might + // be partially verified or partially imported. + Err(BlockError::BeaconChainError(e)) => { + crit!( + self.log, + "Beacon block processing error"; + "error" => ?e, + ); + Err(BlockError::BeaconChainError(e)) + } + // The block failed verification. + Err(other) => { + debug!( + self.log, + "Beacon block rejected"; + "reason" => other.to_string(), + ); + Err(other) } } } @@ -2903,35 +2952,6 @@ impl BeaconChain { )) } - fn handle_block_error(&self, e: BlockError) -> BlockError { - match e { - e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_)) => { - debug!( - self.log, - "Beacon block processing cancelled"; - "error" => ?e, - ); - e - } - BlockError::BeaconChainError(e) => { - crit!( - self.log, - "Beacon block processing error"; - "error" => ?e, - ); - BlockError::BeaconChainError(e) - } - other => { - trace!( - self.log, - "Beacon block rejected"; - "reason" => other.to_string(), - ); - other - } - } - } - /* Import methods */ /// Checks if the block is available, and imports immediately if so, otherwise caches the block @@ -3017,11 +3037,9 @@ impl BeaconChain { consensus_context, } = import_data; - let slot = block.slot(); - // import let chain = self.clone(); - let result = self + let block_root = self .spawn_blocking_handle( move || { chain.import_block( @@ -3037,29 +3055,8 @@ impl BeaconChain { }, "payload_verification_handle", ) - .await - .map_err(|e| { - let b = BlockError::from(e); - self.handle_block_error(b) - })?; - - match result { - // The block was successfully verified and imported. Yay. - Ok(block_root) => { - trace!( - self.log, - "Beacon block imported"; - "block_root" => ?block_root, - "block_slot" => slot, - ); - - // Increment the Prometheus counter for block processing successes. - metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); - - Ok(AvailabilityProcessingStatus::Imported(block_root)) - } - Err(e) => Err(self.handle_block_error(e)), - } + .await??; + Ok(AvailabilityProcessingStatus::Imported(block_root)) } /// Accepts a fully-verified and available block and imports it into the chain without performing any @@ -3248,7 +3245,7 @@ impl BeaconChain { if let Some(blobs) = blobs { if !blobs.is_empty() { - info!( + debug!( self.log, "Writing blobs to store"; "block_root" => %block_root, "count" => blobs.len(), @@ -4111,10 +4108,10 @@ impl BeaconChain { let proposal_epoch = proposal_slot.epoch(T::EthSpec::slots_per_epoch()); let head_block_root = cached_head.head_block_root(); - let parent_block_root = cached_head.parent_block_root(); + let parent_beacon_block_root = cached_head.parent_block_root(); // The proposer head must be equal to the canonical head or its parent. - if proposer_head != head_block_root && proposer_head != parent_block_root { + if proposer_head != head_block_root && proposer_head != parent_beacon_block_root { warn!( self.log, "Unable to compute payload attributes"; @@ -4193,7 +4190,7 @@ impl BeaconChain { // Get the `prev_randao` and parent block number. let head_block_number = cached_head.head_block_number()?; - let (prev_randao, parent_block_number) = if proposer_head == parent_block_root { + let (prev_randao, parent_block_number) = if proposer_head == parent_beacon_block_root { ( cached_head.parent_random()?, head_block_number.saturating_sub(1), @@ -4206,7 +4203,7 @@ impl BeaconChain { proposer_index, prev_randao, parent_block_number, - parent_beacon_block_root: parent_block_root, + parent_beacon_block_root, })) } @@ -4926,7 +4923,6 @@ impl BeaconChain { .map_err(|_| BlockProductionError::InvalidPayloadFork)?, bls_to_execution_changes: bls_to_execution_changes.into(), blob_kzg_commitments: kzg_commitments - .map(to_block_kzg_commitments::) .ok_or(BlockProductionError::InvalidPayloadFork)?, }, }), @@ -6283,31 +6279,7 @@ impl BeaconChain { /// The epoch at which we require a data availability check in block processing. /// `None` if the `Deneb` fork is disabled. pub fn data_availability_boundary(&self) -> Option { - self.spec.deneb_fork_epoch.and_then(|fork_epoch| { - self.epoch().ok().map(|current_epoch| { - std::cmp::max( - fork_epoch, - current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), - ) - }) - }) - } - - /// Returns true if the given epoch lies within the da boundary and false otherwise. - pub fn block_needs_da_check(&self, block_epoch: Epoch) -> bool { - self.data_availability_boundary() - .map_or(false, |da_epoch| block_epoch >= da_epoch) - } - - /// Returns `true` if we are at or past the `Deneb` fork. This will always return `false` if - /// the `Deneb` fork is disabled. - pub fn is_data_availability_check_required(&self) -> Result { - let current_epoch = self.epoch()?; - Ok(self - .spec - .deneb_fork_epoch - .map(|fork_epoch| fork_epoch <= current_epoch) - .unwrap_or(false)) + self.data_availability_checker.data_availability_boundary() } } diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 7c71e77ff7..70c2cec951 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -18,7 +18,7 @@ use std::borrow::Cow; use types::blob_sidecar::BlobIdentifier; use types::{ BeaconState, BeaconStateError, BlobSidecar, BlobSidecarList, ChainSpec, CloneConfig, EthSpec, - Hash256, KzgCommitment, RelativeEpoch, SignedBlobSidecar, Slot, + Hash256, RelativeEpoch, SignedBlobSidecar, Slot, }; /// An error occurred while validating a gossip blob. @@ -172,6 +172,9 @@ impl GossipVerifiedBlob { pub fn slot(&self) -> Slot { self.blob.message.slot } + pub fn proposer_index(&self) -> u64 { + self.blob.message.proposer_index + } } pub fn validate_blob_sidecar_for_gossip( @@ -497,9 +500,6 @@ impl KzgVerifiedBlob { pub fn clone_blob(&self) -> Arc> { self.blob.clone() } - pub fn kzg_commitment(&self) -> KzgCommitment { - self.blob.kzg_commitment - } pub fn block_root(&self) -> Hash256 { self.blob.block_root } diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 0e56de7472..587fd2d1c5 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -162,6 +162,13 @@ impl ExecutedBlock { Self::AvailabilityPending(pending) => &pending.block, } } + + pub fn block_root(&self) -> Hash256 { + match self { + ExecutedBlock::AvailabilityPending(pending) => pending.import_data.block_root, + ExecutedBlock::Available(available) => available.import_data.block_root, + } + } } /// A block that has completed all pre-deneb block processing checks including verification diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 45c4f42411..1bdcc78a38 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -883,7 +883,6 @@ where slasher: self.slasher.clone(), validator_monitor: RwLock::new(validator_monitor), genesis_backfill_slot, - //TODO(sean) should we move kzg solely to the da checker? data_availability_checker: Arc::new( DataAvailabilityChecker::new(slot_clock, kzg.clone(), store, self.spec) .map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?, diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 353b26e036..8488b98ffe 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -24,9 +24,9 @@ use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlo mod overflow_lru_cache; /// The LRU Cache stores `PendingComponents` which can store up to -/// `MAX_BLOBS_PER_BLOCK = 4` blobs each. A `BlobSidecar` is 0.131256 MB. So -/// the maximum size of a `PendingComponents` is ~ 0.525024 MB. Setting this -/// to 1024 means the maximum size of the cache is ~ 0.5 GB. But the cache +/// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So +/// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this +/// to 1024 means the maximum size of the cache is ~ 0.8 GB. But the cache /// will target a size of less than 75% of capacity. pub const OVERFLOW_LRU_CAPACITY: usize = 1024; @@ -79,11 +79,10 @@ impl From for AvailabilityCheckError { } } -/// This cache contains -/// - blobs that have been gossip verified -/// - commitments for blocks that have been gossip verified, but the commitments themselves -/// have not been verified against blobs -/// - blocks that have been fully verified and only require a data availability check +/// This includes a cache for any blocks or blobs that have been received over gossip or RPC +/// and are awaiting more components before they can be imported. Additionally the +/// `DataAvailabilityChecker` is responsible for KZG verification of block components as well as +/// checking whether a "availability check" is required at all. pub struct DataAvailabilityChecker { availability_cache: Arc>, slot_clock: T::SlotClock, @@ -112,18 +111,6 @@ impl Debug for Availability { } } -impl Availability { - /// Returns all the blob identifiers associated with an `AvailableBlock`. - /// Returns `None` if avaiability hasn't been fully satisfied yet. - pub fn get_available_blob_ids(&self) -> Option> { - if let Self::Available(block) = self { - Some(block.get_all_blob_ids()) - } else { - None - } - } -} - impl DataAvailabilityChecker { pub fn new( slot_clock: T::SlotClock, @@ -140,10 +127,13 @@ impl DataAvailabilityChecker { }) } + /// Checks if the given block root is cached. pub fn has_block(&self, block_root: &Hash256) -> bool { self.availability_cache.has_block(block_root) } + /// Checks which blob ids are still required for a given block root, taking any cached + /// components into consideration. pub fn get_missing_blob_ids_checking_cache( &self, block_root: Hash256, @@ -164,7 +154,7 @@ impl DataAvailabilityChecker { ) -> Option> { let epoch = self.slot_clock.now()?.epoch(T::EthSpec::slots_per_epoch()); - self.da_check_required(epoch).then(|| { + self.da_check_required_for_epoch(epoch).then(|| { block_opt .map(|block| { block.get_filtered_blob_ids(Some(block_root), |i, _| { @@ -194,6 +184,8 @@ impl DataAvailabilityChecker { self.availability_cache.peek_blob(blob_id) } + /// Put a list of blobs received via RPC into the availability cache. This performs KZG + /// verification on the blobs in the list. pub fn put_rpc_blobs( &self, block_root: Hash256, @@ -232,8 +224,8 @@ impl DataAvailabilityChecker { .put_kzg_verified_blobs(kzg_verified_blob.block_root(), &[kzg_verified_blob]) } - /// Check if we have all the blobs for a block. If we do, return the Availability variant that - /// triggers import of the block. + /// Check if we have all the blobs for a block. Returns `Availability` which has information + /// about whether all components have been received or more are required. pub fn put_pending_executed_block( &self, executed_block: AvailabilityPendingExecutedBlock, @@ -282,7 +274,7 @@ impl DataAvailabilityChecker { /// Determines the blob requirements for a block. Answers the question: "Does this block require /// blobs?". fn blobs_required_for_block(&self, block: &SignedBeaconBlock) -> bool { - let block_within_da_period = self.da_check_required(block.epoch()); + let block_within_da_period = self.da_check_required_for_epoch(block.epoch()); let block_has_kzg_commitments = block .message() .body() @@ -308,7 +300,7 @@ impl DataAvailabilityChecker { } /// Returns true if the given epoch lies within the da boundary and false otherwise. - pub fn da_check_required(&self, block_epoch: Epoch) -> bool { + pub fn da_check_required_for_epoch(&self, block_epoch: Epoch) -> bool { self.data_availability_boundary() .map_or(false, |da_epoch| block_epoch >= da_epoch) } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 8b4493d49d..683ec68c28 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -270,11 +270,6 @@ pub enum BlockProductionError { BlockingFailed(execution_layer::Error), TerminalPoWBlockLookupFailed(execution_layer::Error), GetPayloadFailed(execution_layer::Error), - GetBlobsFailed(execution_layer::Error), - BlobPayloadMismatch { - blob_block_hash: ExecutionBlockHash, - payload_block_hash: ExecutionBlockHash, - }, FailedToReadFinalizedBlock(store::Error), MissingFinalizedBlock(Hash256), BlockTooLarge(usize), @@ -283,8 +278,7 @@ pub enum BlockProductionError { MissingSyncAggregate, MissingExecutionPayload, MissingKzgCommitment(String), - MissingKzgProof(String), - TokioJoin(tokio::task::JoinError), + TokioJoin(JoinError), BeaconChain(BeaconChainError), InvalidPayloadFork, TrustedSetupNotInitialized, diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 736a2b4f63..98aaf7015b 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -89,10 +89,10 @@ impl BeaconChain { return Ok(0); } - let n_blobs_to_import = blocks_to_import + let n_blobs_lists_to_import = blocks_to_import .iter() - .map(|available_block| available_block.blobs().map_or(0, |blobs| blobs.len())) - .sum::(); + .filter(|available_block| available_block.blobs().is_some()) + .count(); let mut expected_block_root = anchor_info.oldest_block_parent; let mut prev_block_slot = anchor_info.oldest_block_slot; @@ -100,7 +100,7 @@ impl BeaconChain { ChunkWriter::::new(&self.store.cold_db, prev_block_slot.as_usize())?; let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); - let mut hot_batch = Vec::with_capacity(blocks_to_import.len() + n_blobs_to_import); + let mut hot_batch = Vec::with_capacity(blocks_to_import.len() + n_blobs_lists_to_import); let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); for available_block in blocks_to_import.into_iter().rev() { diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 3d21a14c83..f73223fa54 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -186,7 +186,7 @@ async fn state_advance_timer( head_slot, }) => debug!( log, - "Refused to advance head state. Chain may be syncing or lagging too far behind"; + "Refused to advance head state"; "head_slot" => head_slot, "current_slot" => current_slot, ), diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 87a3ec3b58..7b993f3cba 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -60,6 +60,7 @@ use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; +use types::consts::deneb::MAX_BLOBS_PER_BLOCK; use types::{Attestation, Hash256, SignedAggregateAndProof, SubnetId}; use types::{EthSpec, Slot}; use work_reprocessing_queue::IgnoredRpcBlock; @@ -148,7 +149,10 @@ const MAX_SYNC_CONTRIBUTION_QUEUE_LEN: usize = 1024; /// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that /// will be stored before we start dropping them. const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; -const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024 * 4; + +/// The maximum number of queued `BlobSidecar` objects received from the network RPC that +/// will be stored before we start dropping them. +const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `Vec` objects received during syncing that will /// be stored before we start dropping them. @@ -162,13 +166,18 @@ const MAX_STATUS_QUEUE_LEN: usize = 1_024; /// will be stored before we start dropping them. const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024; -const MAX_BLOBS_BY_RANGE_QUEUE_LEN: usize = 1_024; +/// The maximum number of queued `BlobsByRangeRequest` objects received from the network RPC that +/// will be stored before we start dropping them. +const MAX_BLOBS_BY_RANGE_QUEUE_LEN: usize = + MAX_BLOCKS_BY_RANGE_QUEUE_LEN * MAX_BLOBS_PER_BLOCK as usize; /// The maximum number of queued `BlocksByRootRequest` objects received from the network RPC that /// will be stored before we start dropping them. const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024; -const MAX_BLOCK_AND_BLOBS_BY_ROOTS_QUEUE_LEN: usize = 1_024; +/// The maximum number of queued `BlobsByRootRequest` objects received from the network RPC that +/// will be stored before we start dropping them. +const MAX_BLOBS_BY_ROOTS_QUEUE_LEN: usize = 1_024; /// Maximum number of `SignedBlsToExecutionChange` messages to queue before dropping them. /// @@ -808,7 +817,7 @@ impl BeaconProcessor { let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN); let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); - let mut blbroots_queue = FifoQueue::new(MAX_BLOCK_AND_BLOBS_BY_ROOTS_QUEUE_LEN); + let mut blbroots_queue = FifoQueue::new(MAX_BLOBS_BY_ROOTS_QUEUE_LEN); let mut blbrange_queue = FifoQueue::new(MAX_BLOBS_BY_RANGE_QUEUE_LEN); let mut gossip_bls_to_execution_change_queue = @@ -1294,6 +1303,10 @@ impl BeaconProcessor { &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_QUEUE_TOTAL, gossip_block_queue.len() as i64, ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_GOSSIP_BLOB_QUEUE_TOTAL, + gossip_block_queue.len() as i64, + ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL, rpc_block_queue.len() as i64, diff --git a/beacon_node/builder_client/src/lib.rs b/beacon_node/builder_client/src/lib.rs index aebc3f5e2b..28cd1fe486 100644 --- a/beacon_node/builder_client/src/lib.rs +++ b/beacon_node/builder_client/src/lib.rs @@ -1,5 +1,5 @@ use eth2::types::builder_bid::SignedBuilderBid; -use eth2::types::payload::FullPayloadContents; +use eth2::types::FullPayloadContents; use eth2::types::{ BlindedPayload, EthSpec, ExecutionBlockHash, ForkVersionedResponse, PublicKeyBytes, SignedBlockContents, SignedValidatorRegistrationData, Slot, diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index a5d36e9436..72c918a289 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -51,11 +51,6 @@ pub struct Config { /// Path where the freezer database will be located. pub freezer_db_path: Option, /// Path where the blobs database will be located if blobs should be in a separate database. - /// - /// The capacity this location should hold varies with the data availability boundary. It - /// should be able to store < 69 GB when [MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS](types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) is 4096 - /// epochs of 32 slots (up to 131072 bytes data per blob and up to 4 blobs per block, 88 bytes - /// of [BlobsSidecar](types::BlobsSidecar) metadata per block). pub blobs_db_path: Option, pub log_file: PathBuf, /// If true, the node will use co-ordinated junk for eth1 values. diff --git a/beacon_node/execution_layer/src/block_hash.rs b/beacon_node/execution_layer/src/block_hash.rs index 007fbfd76b..5ba61beafc 100644 --- a/beacon_node/execution_layer/src/block_hash.rs +++ b/beacon_node/execution_layer/src/block_hash.rs @@ -7,8 +7,8 @@ use ethers_core::utils::rlp::RlpStream; use keccak_hash::KECCAK_EMPTY_LIST_RLP; use triehash::ordered_trie_root; use types::{ - map_execution_block_header_fields_except_withdrawals, Address, BeaconBlockRef, EthSpec, - ExecutionBlockHash, ExecutionBlockHeader, ExecutionPayloadRef, Hash256, Hash64, Uint256, + map_execution_block_header_fields_base, Address, BeaconBlockRef, EthSpec, ExecutionBlockHash, + ExecutionBlockHeader, ExecutionPayloadRef, Hash256, Hash64, Uint256, }; impl ExecutionLayer { @@ -104,7 +104,7 @@ pub fn rlp_encode_withdrawal(withdrawal: &JsonWithdrawal) -> Vec { pub fn rlp_encode_block_header(header: &ExecutionBlockHeader) -> Vec { let mut rlp_header_stream = RlpStream::new(); rlp_header_stream.begin_unbounded_list(); - map_execution_block_header_fields_except_withdrawals!(&header, |_, field| { + map_execution_block_header_fields_base!(&header, |_, field| { rlp_header_stream.append(field); }); if let Some(withdrawals_root) = &header.withdrawals_root { diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index 8952b15ed0..5fd0610b37 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -5,18 +5,19 @@ use crate::http::{ ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2, ENGINE_GET_PAYLOAD_V3, ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V3, }; -use crate::BlobTxConversionError; use eth2::types::{ - SsePayloadAttributes, SsePayloadAttributesV1, SsePayloadAttributesV2, SsePayloadAttributesV3, + BlobsBundle, SsePayloadAttributes, SsePayloadAttributesV1, SsePayloadAttributesV2, + SsePayloadAttributesV3, }; use ethers_core::types::Transaction; -use ethers_core::utils::rlp::{self, Decodable, Rlp}; +use ethers_core::utils::rlp; +use ethers_core::utils::rlp::{Decodable, Rlp}; use http::deposit_methods::RpcError; pub use json_structures::{JsonWithdrawal, TransitionConfigurationV1}; use pretty_reqwest_error::PrettyReqwestError; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; -use state_processing::per_block_processing::deneb::deneb::kzg_commitment_to_versioned_hash; +use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; use std::convert::TryFrom; use strum::IntoStaticStr; use superstruct::superstruct; @@ -26,8 +27,8 @@ pub use types::{ Withdrawal, Withdrawals, }; use types::{ - BeaconStateError, BlobsBundle, ExecutionPayloadCapella, ExecutionPayloadDeneb, - ExecutionPayloadMerge, KzgProofs, VersionedHash, + BeaconStateError, ExecutionPayloadCapella, ExecutionPayloadDeneb, ExecutionPayloadMerge, + KzgProofs, VersionedHash, }; pub mod auth; @@ -63,7 +64,6 @@ pub enum Error { RequiredMethodUnsupported(&'static str), UnsupportedForkVariant(String), RlpDecoderError(rlp::DecoderError), - BlobTxConversionError(BlobTxConversionError), } impl From for Error { @@ -109,12 +109,6 @@ impl From for Error { } } -impl From for Error { - fn from(e: BlobTxConversionError) -> Self { - Error::BlobTxConversionError(e) - } -} - #[derive(Clone, Copy, Debug, PartialEq, IntoStaticStr)] #[strum(serialize_all = "snake_case")] pub enum PayloadStatusV1Status { @@ -223,7 +217,8 @@ impl TryFrom> for ExecutionBlockWithTransactions .transactions .iter() .map(|tx| Transaction::decode(&Rlp::new(tx))) - .collect::, _>>()?, + .collect::, _>>() + .unwrap_or_else(|_| Vec::new()), }), ExecutionPayload::Capella(block) => { Self::Capella(ExecutionBlockWithTransactionsCapella { @@ -244,7 +239,8 @@ impl TryFrom> for ExecutionBlockWithTransactions .transactions .iter() .map(|tx| Transaction::decode(&Rlp::new(tx))) - .collect::, _>>()?, + .collect::, _>>() + .unwrap_or_else(|_| Vec::new()), withdrawals: Vec::from(block.withdrawals) .into_iter() .map(|withdrawal| withdrawal.into()) @@ -269,7 +265,8 @@ impl TryFrom> for ExecutionBlockWithTransactions .transactions .iter() .map(|tx| Transaction::decode(&Rlp::new(tx))) - .collect::, _>>()?, + .collect::, _>>() + .unwrap_or_else(|_| Vec::new()), withdrawals: Vec::from(block.withdrawals) .into_iter() .map(|withdrawal| withdrawal.into()) diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 3eb79d3163..c680617108 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -67,6 +67,7 @@ pub static LIGHTHOUSE_CAPABILITIES: &[&str] = &[ ENGINE_GET_PAYLOAD_V3, ENGINE_FORKCHOICE_UPDATED_V1, ENGINE_FORKCHOICE_UPDATED_V2, + ENGINE_FORKCHOICE_UPDATED_V3, ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1, ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, ]; @@ -74,7 +75,6 @@ pub static LIGHTHOUSE_CAPABILITIES: &[&str] = &[ /// This is necessary because a user might run a capella-enabled version of /// lighthouse before they update to a capella-enabled execution engine. // TODO (mark): rip this out once we are post-capella on mainnet -// TODO (sean): do we similarly need something like this for 4844? pub static PRE_CAPELLA_ENGINE_CAPABILITIES: EngineCapabilities = EngineCapabilities { new_payload_v1: true, new_payload_v2: false, diff --git a/beacon_node/execution_layer/src/engine_api/json_structures.rs b/beacon_node/execution_layer/src/engine_api/json_structures.rs index 3f4f1eb96b..f35cc7dc57 100644 --- a/beacon_node/execution_layer/src/engine_api/json_structures.rs +++ b/beacon_node/execution_layer/src/engine_api/json_structures.rs @@ -2,12 +2,11 @@ use super::*; use serde::{Deserialize, Serialize}; use strum::EnumString; use superstruct::superstruct; -use types::beacon_block_body::KzgCommitments; +use types::beacon_block_body::BuilderKzgCommitments; use types::blob_sidecar::BlobsList; use types::{ - BlobsBundle, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadCapella, - ExecutionPayloadDeneb, ExecutionPayloadMerge, FixedVector, Transactions, Unsigned, - VariableList, Withdrawal, + EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadDeneb, + ExecutionPayloadMerge, FixedVector, Transactions, Unsigned, VariableList, Withdrawal, }; #[derive(Debug, PartialEq, Serialize, Deserialize)] @@ -439,7 +438,7 @@ impl From for PayloadAttributes { #[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(bound = "E: EthSpec", rename_all = "camelCase")] pub struct JsonBlobsBundleV1 { - pub commitments: KzgCommitments, + pub commitments: BuilderKzgCommitments, pub proofs: KzgProofs, #[serde(with = "ssz_types::serde_utils::list_of_hex_fixed_vec")] pub blobs: BlobsList, diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 9850bd2d0c..a6c9465e16 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -15,7 +15,6 @@ use engines::{Engine, EngineError}; pub use engines::{EngineState, ForkchoiceState}; use eth2::types::{builder_bid::SignedBuilderBid, BlobsBundle, ForkVersionedResponse}; use eth2::types::{FullPayloadContents, SignedBlockContents}; -use ethers_core::abi::ethereum_types::FromStrRadixErr; use ethers_core::types::Transaction as EthersTransaction; use fork_choice::ForkchoiceUpdateParameters; use lru::LruCache; @@ -40,14 +39,14 @@ use tokio::{ }; use tokio_stream::wrappers::WatchStream; use tree_hash::TreeHash; -use types::beacon_block_body::KzgCommitments; -use types::blob_sidecar::BlobItems; +use types::beacon_block_body::{to_block_kzg_commitments, BlockBodyKzgCommitments}; use types::builder_bid::BuilderBid; +use types::sidecar::{BlobItems, Sidecar}; +use types::KzgProofs; use types::{ AbstractExecPayload, BeaconStateError, BlindedPayload, BlockType, ChainSpec, Epoch, ExecPayload, ExecutionPayloadCapella, ExecutionPayloadDeneb, ExecutionPayloadMerge, }; -use types::{KzgProofs, Sidecar}; use types::{ProposerPreparationData, PublicKeyBytes, Signature, Slot, Transaction}; mod block_hash; @@ -111,7 +110,9 @@ impl> TryFrom> .try_into() .map_err(|_| Error::InvalidPayloadConversion)?, block_value: builder_bid.value, - kzg_commitments: builder_bid.blinded_blobs_bundle.commitments, + kzg_commitments: to_block_kzg_commitments::( + builder_bid.blinded_blobs_bundle.commitments, + ), blobs: BlobItems::try_from_blob_roots(builder_bid.blinded_blobs_bundle.blob_roots) .map_err(Error::InvalidBlobConversion)?, proofs: builder_bid.blinded_blobs_bundle.proofs, @@ -167,7 +168,7 @@ pub enum BlockProposalContents> { PayloadAndBlobs { payload: Payload, block_value: Uint256, - kzg_commitments: KzgCommitments, + kzg_commitments: BlockBodyKzgCommitments, blobs: >::BlobItems, proofs: KzgProofs, }, @@ -184,7 +185,7 @@ impl> TryFrom> Some(bundle) => Ok(Self::PayloadAndBlobs { payload: execution_payload.into(), block_value, - kzg_commitments: bundle.commitments, + kzg_commitments: to_block_kzg_commitments::(bundle.commitments), blobs: BlobItems::try_from_blobs(bundle.blobs) .map_err(Error::InvalidBlobConversion)?, proofs: bundle.proofs, @@ -203,7 +204,7 @@ impl> BlockProposalContents ( Payload, - Option>, + Option>, Option<>::BlobItems>, Option>, ) { @@ -1792,10 +1793,10 @@ impl ExecutionLayer { VariableList::new( transactions .into_iter() - .map(ethers_tx_to_ssz::) - .collect::, BlobTxConversionError>>()?, + .map(|tx| VariableList::new(tx.rlp().to_vec())) + .collect::, ssz_types::Error>>()?, ) - .map_err(BlobTxConversionError::SszError) + .map_err(ApiError::SszError) }; let payload = match block { @@ -2142,81 +2143,12 @@ fn timestamp_now() -> u64 { .as_secs() } -#[derive(Debug)] -pub enum BlobTxConversionError { - /// The transaction type was not set. - NoTransactionType, - /// The transaction chain ID was not set. - NoChainId, - /// The transaction nonce was too large to fit in a `u64`. - NonceTooLarge, - /// The transaction gas was too large to fit in a `u64`. - GasTooHigh, - /// Missing the `max_fee_per_gas` field. - MaxFeePerGasMissing, - /// Missing the `max_priority_fee_per_gas` field. - MaxPriorityFeePerGasMissing, - /// Missing the `access_list` field. - AccessListMissing, - /// Missing the `max_fee_per_data_gas` field. - MaxFeePerDataGasMissing, - /// Missing the `versioned_hashes` field. - VersionedHashesMissing, - /// `y_parity` field was greater than one. - InvalidYParity, - /// There was an error converting the transaction to SSZ. - SszError(ssz_types::Error), - /// There was an error converting the transaction from JSON. - SerdeJson(serde_json::Error), - /// There was an error converting the transaction from hex. - FromHex(String), - /// There was an error converting the transaction from hex. - FromStrRadix(FromStrRadixErr), - /// A `versioned_hash` did not contain 32 bytes. - InvalidVersionedHashBytesLen, -} - -impl From for BlobTxConversionError { - fn from(value: ssz_types::Error) -> Self { - Self::SszError(value) - } -} - -impl From for BlobTxConversionError { - fn from(value: serde_json::Error) -> Self { - Self::SerdeJson(value) - } -} - -fn random_valid_tx( -) -> Result, BlobTxConversionError> { - // Calculate transaction bytes. We don't care about the contents of the transaction. - let transaction: EthersTransaction = serde_json::from_str( - r#"{ - "blockHash":"0x1d59ff54b1eb26b013ce3cb5fc9dab3705b415a67127a003c3e61eb445bb8df2", - "blockNumber":"0x5daf3b", - "from":"0xa7d9ddbe1f17865597fbd27ec712455208b6b76d", - "gas":"0xc350", - "gasPrice":"0x4a817c800", - "hash":"0x88df016429689c079f3b2f6ad39fa052532c56795b733da78a91ebe6a713944b", - "input":"0x68656c6c6f21", - "nonce":"0x15", - "to":"0xf02c1c8e6114b1dbe8937a39260b5b0a374432bb", - "transactionIndex":"0x41", - "value":"0xf3dbb76162000", - "v":"0x25", - "r":"0x1b5e176d927f8e9ab405058b2d2457392da3e20f328b16ddabcebc33eaac5fea", - "s":"0x4ba69724e8f69de52f0125ad8b3c5c2cef33019bac3249e2c0a2192766d1721c" - }"#, - ) - .unwrap(); - ethers_tx_to_ssz::(transaction) -} - -fn ethers_tx_to_ssz( - tx: EthersTransaction, -) -> Result, BlobTxConversionError> { - VariableList::new(tx.rlp().to_vec()).map_err(Into::into) +fn static_valid_tx() -> Result, String> { + // This is a real transaction hex encoded, but we don't care about the contents of the transaction. + let bytes = hex::decode( + "b87502f872041a8459682f008459682f0d8252089461815774383099e24810ab832a5b2a5425c154d58829a2241af62c000080c001a059e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafda0016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469" + ).map_err(|e| format!("Failed to decode transaction bytes: {:?}", e))?; + VariableList::new(bytes).map_err(|e| format!("Failed to convert transaction to SSZ: {:?}", e)) } fn noop( diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs index af9bc266aa..444f935331 100644 --- a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -6,8 +6,9 @@ use crate::{ }, ExecutionBlock, PayloadAttributes, PayloadId, PayloadStatusV1, PayloadStatusV1Status, }, - random_valid_tx, ExecutionBlockWithTransactions, + static_valid_tx, ExecutionBlockWithTransactions, }; +use eth2::types::BlobsBundle; use kzg::Kzg; use rand::thread_rng; use serde::{Deserialize, Serialize}; @@ -16,9 +17,9 @@ use std::sync::Arc; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; use types::{ - BlobSidecar, BlobsBundle, ChainSpec, EthSpec, ExecutionBlockHash, ExecutionPayload, - ExecutionPayloadCapella, ExecutionPayloadDeneb, ExecutionPayloadHeader, ExecutionPayloadMerge, - ForkName, Hash256, Transactions, Uint256, + BlobSidecar, ChainSpec, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadCapella, + ExecutionPayloadDeneb, ExecutionPayloadHeader, ExecutionPayloadMerge, ForkName, Hash256, + Transactions, Uint256, }; use super::DEFAULT_TERMINAL_BLOCK; @@ -643,7 +644,7 @@ pub fn generate_random_blobs( .. } = random_valid_sidecar; - let tx = random_valid_tx::() + let tx = static_valid_tx::() .map_err(|e| format!("error creating valid tx SSZ bytes: {:?}", e))?; transactions.push(tx); diff --git a/beacon_node/execution_layer/src/test_utils/mock_builder.rs b/beacon_node/execution_layer/src/test_utils/mock_builder.rs index 4c62e84abe..ee6854799e 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_builder.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_builder.rs @@ -1,7 +1,7 @@ use crate::test_utils::{DEFAULT_BUILDER_PAYLOAD_VALUE_WEI, DEFAULT_JWT_SECRET}; use crate::{Config, ExecutionLayer, PayloadAttributes}; use async_trait::async_trait; -use eth2::types::{BlockId, StateId, ValidatorId}; +use eth2::types::{BlobsBundle, BlockId, StateId, ValidatorId}; use eth2::{BeaconNodeHttpClient, Timeouts}; pub use ethereum_consensus::state_transition::Context; use ethereum_consensus::{ @@ -38,7 +38,7 @@ use tempfile::NamedTempFile; use tree_hash::TreeHash; use types::builder_bid::BlindedBlobsBundle; use types::{ - Address, BeaconState, BlobsBundle, ChainSpec, EthSpec, ExecPayload, ExecutionPayload, + Address, BeaconState, ChainSpec, EthSpec, ExecPayload, ExecutionPayload, ExecutionPayloadHeader, ForkName, Hash256, Slot, Uint256, }; diff --git a/beacon_node/http_api/src/metrics.rs b/beacon_node/http_api/src/metrics.rs index 32a4150365..26ee183c83 100644 --- a/beacon_node/http_api/src/metrics.rs +++ b/beacon_node/http_api/src/metrics.rs @@ -42,16 +42,4 @@ lazy_static::lazy_static! { "http_api_block_published_very_late_total", "The count of times a block was published beyond the attestation deadline" ); - pub static ref HTTP_API_BLOB_BROADCAST_DELAY_TIMES: Result = try_create_histogram( - "http_api_blob_broadcast_delay_times", - "Time between start of the slot and when the blob was broadcast" - ); - pub static ref HTTP_API_BLOB_PUBLISHED_LATE_TOTAL: Result = try_create_int_counter( - "http_api_blob_published_late_total", - "The count of times a blob was published beyond more than half way to the attestation deadline" - ); - pub static ref HTTP_API_BLOB_PUBLISHED_VERY_LATE_TOTAL: Result = try_create_int_counter( - "http_api_blob_published_very_late_total", - "The count of times a blob was published beyond the attestation deadline" - ); } diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 3fe1f37c97..f8babd8f32 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -227,12 +227,16 @@ pub async fn publish_block { let msg = format!("Missing parts of block with root {:?}", block_root); - error!( - log, - "Invalid block provided to HTTP API"; - "reason" => &msg - ); - Err(warp_utils::reject::broadcast_without_import(msg)) + if let BroadcastValidation::Gossip = validation_level { + Err(warp_utils::reject::broadcast_without_import(msg)) + } else { + error!( + log, + "Invalid block provided to HTTP API"; + "reason" => &msg + ); + Err(warp_utils::reject::broadcast_without_import(msg)) + } } Err(BlockError::BeaconChainError(BeaconChainError::UnableToPublish)) => { Err(warp_utils::reject::custom_server_error( diff --git a/beacon_node/http_api/tests/status_tests.rs b/beacon_node/http_api/tests/status_tests.rs index b3d1e9daa8..d37026d406 100644 --- a/beacon_node/http_api/tests/status_tests.rs +++ b/beacon_node/http_api/tests/status_tests.rs @@ -13,12 +13,7 @@ type E = MinimalEthSpec; /// Create a new test environment that is post-merge with `chain_depth` blocks. async fn post_merge_tester(chain_depth: u64, validator_count: u64) -> InteractiveTester { // Test using latest fork so that we simulate conditions as similar to mainnet as possible. - // TODO(jimmy): We should change this back to `latest()`. These tests currently fail on Deneb because: - // 1. KZG library doesn't support Minimal spec, changing to Mainnet spec fixes some tests; BUT - // 2. `harness.process_block_result` in the test below panics due to - // `AvailabilityProcessingStatus::PendingBlobs`, and there seems to be some race - // condition going on, because the test passes if I step through the code in debug. - let mut spec = ForkName::Capella.make_genesis_spec(E::default_spec()); + let mut spec = ForkName::latest().make_genesis_spec(E::default_spec()); spec.terminal_total_difficulty = 1.into(); let tester = InteractiveTester::::new(Some(spec), validator_count as usize).await; @@ -108,7 +103,7 @@ async fn el_error_on_new_payload() { let (block_contents, _) = harness .make_block(pre_state, Slot::new(num_blocks + 1)) .await; - let block = block_contents.0; + let (block, blobs) = block_contents; let block_hash = block .message() .body() @@ -124,7 +119,9 @@ async fn el_error_on_new_payload() { // Attempt to process the block, which should error. harness.advance_slot(); assert!(matches!( - harness.process_block_result((block.clone(), None)).await, + harness + .process_block_result((block.clone(), blobs.clone())) + .await, Err(BlockError::ExecutionPayloadError(_)) )); @@ -143,7 +140,7 @@ async fn el_error_on_new_payload() { validation_error: None, }, ); - harness.process_block_result((block, None)).await.unwrap(); + harness.process_block_result((block, blobs)).await.unwrap(); let api_response = tester.client.get_node_syncing().await.unwrap().data; assert_eq!(api_response.el_offline, Some(false)); diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index fddf5ab842..7a4efb7389 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -73,7 +73,7 @@ impl Encoder> for SSZSnappyInboundCodec< RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), RPCResponse::BlobsByRange(res) => res.as_ssz_bytes(), - RPCResponse::SidecarByRoot(res) => res.as_ssz_bytes(), + RPCResponse::BlobsByRoot(res) => res.as_ssz_bytes(), RPCResponse::LightClientBootstrap(res) => res.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => @@ -421,7 +421,7 @@ fn context_bytes( SignedBeaconBlock::Base { .. } => Some(fork_context.genesis_context_bytes()), }; } - if let RPCResponse::BlobsByRange(_) | RPCResponse::SidecarByRoot(_) = rpc_variant { + if let RPCResponse::BlobsByRange(_) | RPCResponse::BlobsByRoot(_) = rpc_variant { return fork_context.to_context_bytes(ForkName::Deneb); } } @@ -563,7 +563,7 @@ fn handle_rpc_response( )), }, SupportedProtocol::BlobsByRootV1 => match fork_name { - Some(ForkName::Deneb) => Ok(Some(RPCResponse::SidecarByRoot(Arc::new( + Some(ForkName::Deneb) => Ok(Some(RPCResponse::BlobsByRoot(Arc::new( BlobSidecar::from_ssz_bytes(decoded_buffer)?, )))), Some(_) => Err(RPCError::ErrorResponse( @@ -1058,11 +1058,11 @@ mod tests { assert_eq!( encode_then_decode_response( SupportedProtocol::BlobsByRootV1, - RPCCodedResponse::Success(RPCResponse::SidecarByRoot(default_blob_sidecar())), + RPCCodedResponse::Success(RPCResponse::BlobsByRoot(default_blob_sidecar())), ForkName::Deneb, &chain_spec ), - Ok(Some(RPCResponse::SidecarByRoot(default_blob_sidecar()))), + Ok(Some(RPCResponse::BlobsByRoot(default_blob_sidecar()))), ); } diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 70fb94a66c..2148ece569 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use strum::IntoStaticStr; use superstruct::superstruct; use types::blob_sidecar::BlobIdentifier; +use types::consts::deneb::MAX_BLOBS_PER_BLOCK; use types::{ blob_sidecar::BlobSidecar, light_client_bootstrap::LightClientBootstrap, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot, @@ -31,12 +32,8 @@ pub const MAX_ERROR_LEN: u64 = 256; pub type MaxRequestBlocksDeneb = U128; pub const MAX_REQUEST_BLOCKS_DENEB: u64 = 128; -// TODO: this is calculated as MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK and -// MAX_BLOBS_PER_BLOCK comes from the spec. -// MAX_REQUEST_BLOCKS_DENEB = 128 -// MAX_BLOBS_PER_BLOCK = 6 pub type MaxRequestBlobSidecars = U768; -pub const MAX_REQUEST_BLOB_SIDECARS: u64 = 768; +pub const MAX_REQUEST_BLOB_SIDECARS: u64 = MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK; /// Wrapper over SSZ List to represent error message in rpc responses. #[derive(Debug, Clone)] @@ -390,7 +387,7 @@ pub enum RPCResponse { LightClientBootstrap(LightClientBootstrap), /// A response to a get BLOBS_BY_ROOT request. - SidecarByRoot(Arc>), + BlobsByRoot(Arc>), /// A PONG response to a PING request. Pong(Ping), @@ -483,7 +480,7 @@ impl RPCCodedResponse { RPCResponse::BlocksByRange(_) => true, RPCResponse::BlocksByRoot(_) => true, RPCResponse::BlobsByRange(_) => true, - RPCResponse::SidecarByRoot(_) => true, + RPCResponse::BlobsByRoot(_) => true, RPCResponse::Pong(_) => false, RPCResponse::MetaData(_) => false, RPCResponse::LightClientBootstrap(_) => false, @@ -521,7 +518,7 @@ impl RPCResponse { RPCResponse::BlocksByRange(_) => Protocol::BlocksByRange, RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, RPCResponse::BlobsByRange(_) => Protocol::BlobsByRange, - RPCResponse::SidecarByRoot(_) => Protocol::BlobsByRoot, + RPCResponse::BlobsByRoot(_) => Protocol::BlobsByRoot, RPCResponse::Pong(_) => Protocol::Ping, RPCResponse::MetaData(_) => Protocol::MetaData, RPCResponse::LightClientBootstrap(_) => Protocol::LightClientBootstrap, @@ -562,7 +559,7 @@ impl std::fmt::Display for RPCResponse { RPCResponse::BlobsByRange(blob) => { write!(f, "BlobsByRange: Blob slot: {}", blob.slot) } - RPCResponse::SidecarByRoot(sidecar) => { + RPCResponse::BlobsByRoot(sidecar) => { write!(f, "BlobsByRoot: Blob slot: {}", sidecar.slot) } RPCResponse::Pong(ping) => write!(f, "Pong: {}", ping.data), diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index c22a2d8b7a..cdc7e4d74d 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -277,8 +277,8 @@ impl SupportedProtocol { } } - fn currently_supported() -> Vec { - vec![ + fn currently_supported(fork_context: &ForkContext) -> Vec { + let mut supported = vec![ ProtocolId::new(Self::StatusV1, Encoding::SSZSnappy), ProtocolId::new(Self::GoodbyeV1, Encoding::SSZSnappy), // V2 variants have higher preference then V1 @@ -286,12 +286,17 @@ impl SupportedProtocol { ProtocolId::new(Self::BlocksByRangeV1, Encoding::SSZSnappy), ProtocolId::new(Self::BlocksByRootV2, Encoding::SSZSnappy), ProtocolId::new(Self::BlocksByRootV1, Encoding::SSZSnappy), - ProtocolId::new(Self::BlobsByRangeV1, Encoding::SSZSnappy), - ProtocolId::new(Self::BlobsByRootV1, Encoding::SSZSnappy), ProtocolId::new(Self::PingV1, Encoding::SSZSnappy), ProtocolId::new(Self::MetaDataV2, Encoding::SSZSnappy), ProtocolId::new(Self::MetaDataV1, Encoding::SSZSnappy), - ] + ]; + if fork_context.fork_exists(ForkName::Deneb) { + supported.extend_from_slice(&[ + ProtocolId::new(SupportedProtocol::BlobsByRootV1, Encoding::SSZSnappy), + ProtocolId::new(SupportedProtocol::BlobsByRangeV1, Encoding::SSZSnappy), + ]); + } + supported } } @@ -319,14 +324,7 @@ impl UpgradeInfo for RPCProtocol { /// The list of supported RPC protocols for Lighthouse. fn protocol_info(&self) -> Self::InfoIter { - let mut supported_protocols = SupportedProtocol::currently_supported(); - - if let ForkName::Deneb = self.fork_context.current_fork() { - supported_protocols.extend_from_slice(&[ - ProtocolId::new(SupportedProtocol::BlobsByRootV1, Encoding::SSZSnappy), - ProtocolId::new(SupportedProtocol::BlobsByRangeV1, Encoding::SSZSnappy), - ]); - } + let mut supported_protocols = SupportedProtocol::currently_supported(&self.fork_context); if self.enable_light_client_server { supported_protocols.push(ProtocolId::new( SupportedProtocol::LightClientBootstrapV1, diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index b75dcc2a08..6ede0666a3 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -91,10 +91,10 @@ pub enum Response { BlobsByRange(Option>>), /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Option>>), - /// A response to a LightClientUpdate request. - LightClientBootstrap(LightClientBootstrap), /// A response to a get BLOBS_BY_ROOT request. BlobsByRoot(Option>>), + /// A response to a LightClientUpdate request. + LightClientBootstrap(LightClientBootstrap), } impl std::convert::From> for RPCCodedResponse { @@ -109,7 +109,7 @@ impl std::convert::From> for RPCCodedResponse RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange), }, Response::BlobsByRoot(r) => match r { - Some(b) => RPCCodedResponse::Success(RPCResponse::SidecarByRoot(b)), + Some(b) => RPCCodedResponse::Success(RPCResponse::BlobsByRoot(b)), None => RPCCodedResponse::StreamTermination(ResponseTermination::BlobsByRoot), }, Response::BlobsByRange(r) => match r { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index ee03cc88a6..a97e09acd0 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1356,7 +1356,7 @@ impl Network { RPCResponse::BlocksByRoot(resp) => { self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } - RPCResponse::SidecarByRoot(resp) => { + RPCResponse::BlobsByRoot(resp) => { self.build_response(id, peer_id, Response::BlobsByRoot(Some(resp))) } // Should never be reached diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 7e1dcca495..92ebbbea6f 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -11,6 +11,7 @@ matches = "0.1.8" exit-future = "0.2.0" slog-term = "2.6.0" slog-async = "2.5.0" +eth2 = {path="../../common/eth2"} [dependencies] beacon_chain = { path = "../beacon_chain" } diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 73b98e210f..ee0d5e6234 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -71,10 +71,6 @@ lazy_static! { "beacon_processor_gossip_blob_verified_total", "Total number of gossip blob verified for propagation." ); - pub static ref BEACON_PROCESSOR_GOSSIP_BLOB_IMPORTED_TOTAL: Result = try_create_int_counter( - "beacon_processor_gossip_blob_imported_total", - "Total number of gossip blobs imported to fork choice, etc." - ); // Gossip Exits. pub static ref BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL: Result = try_create_int_counter( "beacon_processor_exit_verified_total", @@ -120,10 +116,6 @@ lazy_static! { "beacon_processor_rpc_block_imported_total", "Total number of gossip blocks imported to fork choice, etc." ); - pub static ref BEACON_PROCESSOR_RPC_BLOB_IMPORTED_TOTAL: Result = try_create_int_counter( - "beacon_processor_rpc_blob_imported_total", - "Total number of gossip blobs imported." - ); // Chain segments. pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL: Result = try_create_int_counter( "beacon_processor_chain_segment_success_total", diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 4b2a11a567..2821498856 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -621,6 +621,20 @@ impl NetworkBeaconProcessor { .verify_blob_sidecar_for_gossip(signed_blob, blob_index) { Ok(gossip_verified_blob) => { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOB_VERIFIED_TOTAL); + + if delay >= self.chain.slot_clock.unagg_attestation_production_delay() { + metrics::inc_counter(&metrics::BEACON_BLOB_GOSSIP_ARRIVED_LATE_TOTAL); + debug!( + self.log, + "Gossip blob arrived late"; + "block_root" => ?gossip_verified_blob.block_root(), + "proposer_index" => gossip_verified_blob.proposer_index(), + "slot" => gossip_verified_blob.slot(), + "delay" => ?delay, + ); + } + debug!( self.log, "Successfully verified gossip blob"; @@ -628,8 +642,20 @@ impl NetworkBeaconProcessor { "root" => %root, "index" => %index ); - metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOB_VERIFIED_TOTAL); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + // Log metrics to keep track of propagation delay times. + if let Some(duration) = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .and_then(|now| now.checked_sub(seen_duration)) + { + metrics::observe_duration( + &metrics::BEACON_BLOB_GOSSIP_PROPAGATION_VERIFICATION_DELAY_TIME, + duration, + ); + } self.process_gossip_verified_blob(peer_id, gossip_verified_blob, seen_duration) .await } diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index d70db92499..222b9c8fd4 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -18,7 +18,9 @@ use std::sync::Arc; use task_executor::TaskExecutor; use tokio_stream::StreamExt; use types::blob_sidecar::BlobIdentifier; -use types::{light_client_bootstrap::LightClientBootstrap, Epoch, EthSpec, Hash256, Slot}; +use types::{ + light_client_bootstrap::LightClientBootstrap, Epoch, EthSpec, ForkName, Hash256, Slot, +}; impl NetworkBeaconProcessor { /* Auxiliary functions */ @@ -376,13 +378,19 @@ impl NetworkBeaconProcessor { ); // Should not send more than max request blocks - // TODO: We should switch the limit to `MAX_REQUEST_BLOCKS` at the fork, - // or maybe consider switching the max value given the fork context. - if *req.count() > MAX_REQUEST_BLOCKS_DENEB { + let max_request_size = self.chain.epoch().map_or(MAX_REQUEST_BLOCKS, |epoch| { + match self.chain.spec.fork_name_at_epoch(epoch) { + ForkName::Deneb => MAX_REQUEST_BLOCKS_DENEB, + ForkName::Base | ForkName::Altair | ForkName::Merge | ForkName::Capella => { + MAX_REQUEST_BLOCKS + } + } + }); + if *req.count() > max_request_size { return self.send_error_response( peer_id, RPCResponseErrorCode::InvalidRequest, - "Request exceeded `MAX_REQUEST_BLOCKS_DENEB`".into(), + format!("Request exceeded max size {max_request_size}"), request_id, ); } @@ -425,17 +433,7 @@ impl NetworkBeaconProcessor { }; // Pick out the required blocks, ignoring skip-slots. - let mut last_block_root = req - .start_slot() - .checked_sub(1) - .map(|prev_slot| { - self.chain - .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) - }) - .transpose() - .ok() - .flatten() - .flatten(); + let mut last_block_root = None; let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { iter.take_while(|(_, slot)| { slot.as_u64() < req.start_slot().saturating_add(*req.count()) @@ -714,17 +712,12 @@ impl NetworkBeaconProcessor { }; // Pick out the required blocks, ignoring skip-slots. - let mut last_block_root = req - .start_slot - .checked_sub(1) - .map(|prev_slot| { - self.chain - .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) - }) - .transpose() - .ok() - .flatten() - .flatten(); + let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| { + self.chain + .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) + .ok() + .flatten() + }); let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) // map skip slots to None diff --git a/beacon_node/network/src/subnet_service/attestation_subnets.rs b/beacon_node/network/src/subnet_service/attestation_subnets.rs index 02313efbf9..b4f52df39d 100644 --- a/beacon_node/network/src/subnet_service/attestation_subnets.rs +++ b/beacon_node/network/src/subnet_service/attestation_subnets.rs @@ -151,7 +151,7 @@ impl AttestationService { } /// Return count of all currently subscribed subnets (long-lived **and** short-lived). - #[cfg(all(test, feature = "spec-mainnet"))] + #[cfg(test)] pub fn subscription_count(&self) -> usize { if self.subscribe_all_subnets { self.beacon_chain.spec.attestation_subnet_count as usize @@ -167,7 +167,7 @@ impl AttestationService { } /// Returns whether we are subscribed to a subnet for testing purposes. - #[cfg(all(test, feature = "spec-mainnet"))] + #[cfg(test)] pub(crate) fn is_subscribed( &self, subnet_id: &SubnetId, @@ -179,7 +179,7 @@ impl AttestationService { } } - #[cfg(all(test, feature = "spec-mainnet"))] + #[cfg(test)] pub(crate) fn long_lived_subscriptions(&self) -> &HashSet { &self.long_lived_subscriptions } diff --git a/beacon_node/network/src/subnet_service/sync_subnets.rs b/beacon_node/network/src/subnet_service/sync_subnets.rs index 982962b6ba..eda7ce8efb 100644 --- a/beacon_node/network/src/subnet_service/sync_subnets.rs +++ b/beacon_node/network/src/subnet_service/sync_subnets.rs @@ -91,7 +91,7 @@ impl SyncCommitteeService { } /// Return count of all currently subscribed subnets. - #[cfg(all(test, feature = "spec-mainnet"))] + #[cfg(test)] pub fn subscription_count(&self) -> usize { use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT; if self.subscribe_all_subnets { diff --git a/beacon_node/network/src/subnet_service/tests/mod.rs b/beacon_node/network/src/subnet_service/tests/mod.rs index 58a882b4d8..3b8c89a442 100644 --- a/beacon_node/network/src/subnet_service/tests/mod.rs +++ b/beacon_node/network/src/subnet_service/tests/mod.rs @@ -1,4 +1,3 @@ -#![cfg(feature = "spec-mainnet")] use super::*; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 660e693d2e..ec88ffb162 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -21,8 +21,8 @@ use types::beacon_block_body::to_block_kzg_commitments; use types::{ map_fork_name, map_fork_name_with, test_utils::{SeedableRng, TestRandom, XorShiftRng}, - BeaconBlock, BlobSidecar, BlobsBundle, EthSpec, ForkName, FullPayloadDeneb, - MinimalEthSpec as E, SignedBeaconBlock, + BeaconBlock, BlobSidecar, EthSpec, ForkName, FullPayloadDeneb, MinimalEthSpec as E, + SignedBeaconBlock, }; type T = Witness, E, MemoryStore, MemoryStore>; @@ -127,7 +127,7 @@ impl TestRig { message.body.blob_kzg_commitments = to_block_kzg_commitments::(bundle.commitments.clone()); - let BlobsBundle { + let eth2::types::BlobsBundle { commitments, proofs, blobs, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index dda30a1bf3..b68d054bb4 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -1108,8 +1108,12 @@ impl SyncManager { self.log, "Blocks and blobs request for range received invalid data"; "peer_id" => %peer_id, "batch_id" => resp.batch_id, "error" => e ); - // TODO: penalize the peer for being a bad boy let id = RequestId::RangeBlockAndBlobs { id }; + self.network.report_peer( + peer_id, + PeerAction::MidToleranceError, + "block_blob_faulty_batch", + ); self.inject_error(peer_id, id, RPCError::InvalidData(e.into())) } } @@ -1160,8 +1164,12 @@ impl SyncManager { self.log, "Blocks and blobs request for backfill received invalid data"; "peer_id" => %peer_id, "batch_id" => resp.batch_id, "error" => e ); - // TODO: penalize the peer for being a bad boy let id = RequestId::BackFillBlockAndBlobs { id }; + self.network.report_peer( + peer_id, + PeerAction::MidToleranceError, + "block_blob_faulty_backfill_batch", + ); self.inject_error(peer_id, id, RPCError::InvalidData(e.into())) } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 4b75f56815..f7779cb76d 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -537,7 +537,7 @@ impl SyncNetworkContext { &self.network_beacon_processor } - pub(crate) fn next_id(&mut self) -> Id { + pub fn next_id(&mut self) -> Id { let id = self.request_id; self.request_id += 1; id @@ -545,7 +545,6 @@ impl SyncNetworkContext { /// Check whether a batch for this epoch (and only this epoch) should request just blocks or /// blocks and blobs. - #[allow(unused)] pub fn batch_type(&self, epoch: types::Epoch) -> ByRangeRequestType { // Induces a compile time panic if this doesn't hold true. #[allow(clippy::assertions_on_constants)] @@ -555,12 +554,6 @@ impl SyncNetworkContext { "To deal with alignment with deneb boundaries, batches need to be of just one epoch" ); - #[cfg(test)] - { - // Keep tests only for blocks. - ByRangeRequestType::Blocks - } - #[cfg(not(test))] if let Some(data_availability_boundary) = self.chain.data_availability_boundary() { if epoch >= data_availability_boundary { ByRangeRequestType::BlocksAndBlobs diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 4ca518f989..72bb4d3be7 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -384,14 +384,13 @@ mod tests { use crate::NetworkMessage; use super::*; + use crate::sync::network_context::BlockOrBlob; use beacon_chain::builder::Witness; use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::parking_lot::RwLock; use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; use beacon_chain::EngineState; use beacon_processor::WorkEvent as BeaconWorkEvent; - use lighthouse_network::rpc::BlocksByRangeRequest; - use lighthouse_network::Request; use lighthouse_network::{rpc::StatusMessage, NetworkGlobals}; use slog::{o, Drain}; use slot_clock::TestingSlotClock; @@ -399,7 +398,7 @@ mod tests { use std::sync::Arc; use store::MemoryStore; use tokio::sync::mpsc; - use types::{Hash256, MinimalEthSpec as E}; + use types::{ForkName, Hash256, MinimalEthSpec as E}; #[derive(Debug)] struct FakeStorage { @@ -515,18 +514,39 @@ mod tests { /// Reads an BlocksByRange request to a given peer from the network receiver channel. #[track_caller] - fn grab_request(&mut self, expected_peer: &PeerId) -> (RequestId, BlocksByRangeRequest) { - if let Ok(NetworkMessage::SendRequest { + fn grab_request( + &mut self, + expected_peer: &PeerId, + fork_name: ForkName, + ) -> (RequestId, Option) { + let block_req_id = if let Ok(NetworkMessage::SendRequest { peer_id, - request: Request::BlocksByRange(request), + request: _, request_id, }) = self.network_rx.try_recv() { assert_eq!(&peer_id, expected_peer); - (request_id, request) + request_id } else { panic!("Should have sent a batch request to the peer") - } + }; + let blob_req_id = match fork_name { + ForkName::Deneb => { + if let Ok(NetworkMessage::SendRequest { + peer_id, + request: _, + request_id, + }) = self.network_rx.try_recv() + { + assert_eq!(&peer_id, expected_peer); + Some(request_id) + } else { + panic!("Should have sent a batch request to the peer") + } + } + _ => None, + }; + (block_req_id, blob_req_id) } /// Produce a head peer @@ -646,8 +666,14 @@ mod tests { range.add_peer(&mut rig.cx, local_info, head_peer, remote_info); range.assert_state(RangeSyncType::Head); + let fork = rig + .cx + .chain + .spec + .fork_name_at_epoch(rig.cx.chain.epoch().unwrap()); + // Sync should have requested a batch, grab the request. - let _request = rig.grab_request(&head_peer); + let _ = rig.grab_request(&head_peer, fork); // Now get a peer with an advanced finalized epoch. let (finalized_peer, local_info, remote_info) = rig.finalized_peer(); @@ -655,7 +681,7 @@ mod tests { range.assert_state(RangeSyncType::Finalized); // Sync should have requested a batch, grab the request - let _second_request = rig.grab_request(&finalized_peer); + let _ = rig.grab_request(&finalized_peer, fork); // Fail the head chain by disconnecting the peer. range.remove_peer(&mut rig.cx, &head_peer); @@ -673,8 +699,14 @@ mod tests { range.add_peer(&mut rig.cx, local_info, head_peer, head_info); range.assert_state(RangeSyncType::Head); + let fork = rig + .cx + .chain + .spec + .fork_name_at_epoch(rig.cx.chain.epoch().unwrap()); + // Sync should have requested a batch, grab the request. - let _request = rig.grab_request(&head_peer); + let _ = rig.grab_request(&head_peer, fork); // Now get a peer with an advanced finalized epoch. let (finalized_peer, local_info, remote_info) = rig.finalized_peer(); @@ -683,7 +715,7 @@ mod tests { range.assert_state(RangeSyncType::Finalized); // Sync should have requested a batch, grab the request - let _second_request = rig.grab_request(&finalized_peer); + let _ = rig.grab_request(&finalized_peer, fork); // Now the chain knows both chains target roots. rig.chain.remember_block(head_peer_root); @@ -697,15 +729,39 @@ mod tests { #[test] fn pause_and_resume_on_ee_offline() { let (mut rig, mut range) = range(true); + let fork = rig + .cx + .chain + .spec + .fork_name_at_epoch(rig.cx.chain.epoch().unwrap()); // add some peers let (peer1, local_info, head_info) = rig.head_peer(); range.add_peer(&mut rig.cx, local_info, peer1, head_info); - let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 { - RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => { - (rig.cx.range_sync_block_only_response(id, true).unwrap(), id) + let (block_req, blob_req_opt) = rig.grab_request(&peer1, fork); + + let (chain1, batch1, id1) = if blob_req_opt.is_some() { + match block_req { + RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => { + let _ = rig + .cx + .range_sync_block_and_blob_response(id, BlockOrBlob::Block(None)); + let (chain1, response) = rig + .cx + .range_sync_block_and_blob_response(id, BlockOrBlob::Blob(None)) + .unwrap(); + (chain1, response.batch_id, id) + } + other => panic!("unexpected request {:?}", other), + } + } else { + match block_req { + RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => { + let (chain, batch) = rig.cx.range_sync_block_only_response(id, true).unwrap(); + (chain, batch, id) + } + other => panic!("unexpected request {:?}", other), } - other => panic!("unexpected request {:?}", other), }; // make the ee offline @@ -720,11 +776,30 @@ mod tests { // while the ee is offline, more peers might arrive. Add a new finalized peer. let (peer2, local_info, finalized_info) = rig.finalized_peer(); range.add_peer(&mut rig.cx, local_info, peer2, finalized_info); - let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 { - RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => { - (rig.cx.range_sync_block_only_response(id, true).unwrap(), id) + let (block_req, blob_req_opt) = rig.grab_request(&peer2, fork); + + let (chain2, batch2, id2) = if blob_req_opt.is_some() { + match block_req { + RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => { + let _ = rig + .cx + .range_sync_block_and_blob_response(id, BlockOrBlob::Block(None)); + let (chain2, response) = rig + .cx + .range_sync_block_and_blob_response(id, BlockOrBlob::Blob(None)) + .unwrap(); + (chain2, response.batch_id, id) + } + other => panic!("unexpected request {:?}", other), + } + } else { + match block_req { + RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => { + let (chain, batch) = rig.cx.range_sync_block_only_response(id, true).unwrap(); + (chain, batch, id) + } + other => panic!("unexpected request {:?}", other), } - other => panic!("unexpected request {:?}", other), }; // send the response to the request diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index c6d7da4e6a..f112bfa732 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -666,7 +666,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("1") .takes_value(true) ) - /* 4844 settings */ + /* Deneb settings */ .arg( Arg::with_name("trusted-setup-file-override") .long("trusted-setup-file-override") @@ -709,6 +709,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .arg( Arg::with_name("prune-blobs") .long("prune-blobs") + .value_name("BOOLEAN") .help("Prune blobs from Lighthouse's database when they are older than the data \ data availability boundary relative to the current epoch.") .takes_value(true) @@ -717,6 +718,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .arg( Arg::with_name("epochs-per-blob-prune") .long("epochs-per-blob-prune") + .value_name("EPOCHS") .help("The epoch interval with which to prune blobs from Lighthouse's \ database when they are older than the data availability boundary \ relative to the current epoch.") @@ -726,6 +728,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .arg( Arg::with_name("blob-prune-margin-epochs") .long("blob-prune-margin-epochs") + .value_name("EPOCHS") .help("The margin for blob pruning in epochs. The oldest blobs are pruned \ up until data_availability_boundary - blob_prune_margin_epochs.") .takes_value(true) diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 460084653b..2ff433f11b 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -8,7 +8,6 @@ pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048; pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192; pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5; pub const DEFAULT_HISTORIC_STATE_CACHE_SIZE: usize = 1; -pub const DEFAULT_BLOB_CACHE_SIZE: usize = 5; pub const DEFAULT_EPOCHS_PER_BLOB_PRUNE: u64 = 1; pub const DEFAULT_BLOB_PUNE_MARGIN_EPOCHS: u64 = 0; @@ -23,8 +22,6 @@ pub struct StoreConfig { pub block_cache_size: usize, /// Maximum number of states from freezer database to store in the in-memory state cache. pub historic_state_cache_size: usize, - /// Maximum number of blobs to store in the in-memory blob cache. - pub blob_cache_size: usize, /// Whether to compact the database on initialization. pub compact_on_init: bool, /// Whether to compact the database during database pruning. @@ -59,7 +56,6 @@ impl Default for StoreConfig { slots_per_restore_point_set_explicitly: false, block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, historic_state_cache_size: DEFAULT_HISTORIC_STATE_CACHE_SIZE, - blob_cache_size: DEFAULT_BLOB_CACHE_SIZE, compact_on_init: false, compact_on_prune: true, prune_payloads: true, diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index c19ac03515..11cda6be0c 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -19,8 +19,6 @@ pub enum Error { }, RlpError(String), BlockNotFound(Hash256), - /// The blobs sidecar mapping to this block root is older than the data availability boundary. - BlobsTooOld(Hash256, Slot), NoContinuationData, SplitPointModified(Slot, Slot), ConfigError(StoreConfigError), diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 74d8e6fa18..a051966646 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -66,10 +66,8 @@ pub struct HotColdDB, Cold: ItemStore> { /// /// The hot database also contains all blocks. pub hot_db: Hot, - /// LRU cache of deserialized blobs. Updated whenever a blob is loaded. - blob_cache: Mutex>>, - /// LRU cache of deserialized blocks. Updated whenever a block is loaded. - block_cache: Mutex>>, + /// LRU cache of deserialized blocks and blobs. Updated whenever a block or blob is loaded. + block_cache: Mutex>, /// LRU cache of replayed states. state_cache: Mutex>>, /// Chain spec. @@ -80,6 +78,46 @@ pub struct HotColdDB, Cold: ItemStore> { _phantom: PhantomData, } +#[derive(Debug)] +struct BlockCache { + block_cache: LruCache>, + blob_cache: LruCache>, +} + +impl BlockCache { + pub fn new(size: usize) -> Self { + Self { + block_cache: LruCache::new(size), + blob_cache: LruCache::new(size), + } + } + pub fn put_block(&mut self, block_root: Hash256, block: SignedBeaconBlock) { + self.block_cache.put(block_root, block); + } + pub fn put_blobs(&mut self, block_root: Hash256, blobs: BlobSidecarList) { + self.blob_cache.put(block_root, blobs); + } + pub fn get_block<'a>( + &'a mut self, + block_root: &Hash256, + ) -> Option<&'a SignedBeaconBlock>> { + self.block_cache.get(block_root) + } + pub fn get_blobs<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a BlobSidecarList> { + self.blob_cache.get(block_root) + } + pub fn delete_block(&mut self, block_root: &Hash256) { + let _ = self.block_cache.pop(block_root); + } + pub fn delete_blobs(&mut self, block_root: &Hash256) { + let _ = self.blob_cache.pop(block_root); + } + pub fn delete(&mut self, block_root: &Hash256) { + let _ = self.block_cache.pop(block_root); + let _ = self.blob_cache.pop(block_root); + } +} + #[derive(Debug, PartialEq)] pub enum HotColdDBError { UnsupportedSchemaVersion { @@ -144,9 +182,8 @@ impl HotColdDB, MemoryStore> { cold_db: MemoryStore::open(), blobs_db: Some(MemoryStore::open()), hot_db: MemoryStore::open(), - block_cache: Mutex::new(LruCache::new(config.block_cache_size)), + block_cache: Mutex::new(BlockCache::new(config.block_cache_size)), state_cache: Mutex::new(LruCache::new(config.historic_state_cache_size)), - blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)), config, spec, log, @@ -182,9 +219,8 @@ impl HotColdDB, LevelDB> { cold_db: LevelDB::open(cold_path)?, blobs_db: None, hot_db: LevelDB::open(hot_path)?, - block_cache: Mutex::new(LruCache::new(config.block_cache_size)), + block_cache: Mutex::new(BlockCache::new(config.block_cache_size)), state_cache: Mutex::new(LruCache::new(config.historic_state_cache_size)), - blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)), config, spec, log, @@ -351,7 +387,7 @@ impl, Cold: ItemStore> HotColdDB let block = self.block_as_kv_store_ops(block_root, block, &mut ops)?; self.hot_db.do_atomically(ops)?; // Update cache. - self.block_cache.lock().put(*block_root, block); + self.block_cache.lock().put_block(*block_root, block); Ok(()) } @@ -403,7 +439,7 @@ impl, Cold: ItemStore> HotColdDB metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT); // Check the cache. - if let Some(block) = self.block_cache.lock().get(block_root) { + if let Some(block) = self.block_cache.lock().get_block(block_root) { metrics::inc_counter(&metrics::BEACON_BLOCK_CACHE_HIT_COUNT); return Ok(Some(DatabaseBlock::Full(block.clone()))); } @@ -428,7 +464,9 @@ impl, Cold: ItemStore> HotColdDB let full_block = self.make_full_block(block_root, blinded_block)?; // Add to cache. - self.block_cache.lock().put(*block_root, full_block.clone()); + self.block_cache + .lock() + .put_block(*block_root, full_block.clone()); DatabaseBlock::Full(full_block) } else if !self.config.prune_payloads { @@ -563,7 +601,7 @@ impl, Cold: ItemStore> HotColdDB /// Delete a block from the store and the block cache. pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> { - self.block_cache.lock().pop(block_root); + self.block_cache.lock().delete(block_root); self.hot_db .key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())?; self.hot_db @@ -579,7 +617,7 @@ impl, Cold: ItemStore> HotColdDB block_root.as_bytes(), &blobs.as_ssz_bytes(), )?; - self.blob_cache.lock().push(*block_root, blobs); + self.block_cache.lock().put_blobs(*block_root, blobs); Ok(()) } @@ -913,7 +951,6 @@ impl, Cold: ItemStore> HotColdDB // Update database whilst holding a lock on cache, to ensure that the cache updates // atomically with the database. let mut guard = self.block_cache.lock(); - let mut guard_blob = self.blob_cache.lock(); let blob_cache_ops = blobs_ops.clone(); let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); @@ -947,7 +984,7 @@ impl, Cold: ItemStore> HotColdDB for op in hot_db_cache_ops { match op { StoreOp::PutBlock(block_root, block) => { - guard.put(block_root, (*block).clone()); + guard.put_block(block_root, (*block).clone()); } StoreOp::PutBlobs(_, _) => (), @@ -961,7 +998,7 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteStateTemporaryFlag(_) => (), StoreOp::DeleteBlock(block_root) => { - guard.pop(&block_root); + guard.delete_block(&block_root); } StoreOp::DeleteBlobs(_) => (), @@ -979,11 +1016,11 @@ impl, Cold: ItemStore> HotColdDB for op in blob_cache_ops { match op { StoreOp::PutBlobs(block_root, blobs) => { - guard_blob.put(block_root, blobs); + guard.put_blobs(block_root, blobs); } StoreOp::DeleteBlobs(block_root) => { - guard_blob.pop(&block_root); + guard.delete_blobs(&block_root); } _ => (), @@ -991,7 +1028,6 @@ impl, Cold: ItemStore> HotColdDB } drop(guard); - drop(guard_blob); Ok(()) } @@ -1360,12 +1396,18 @@ impl, Cold: ItemStore> HotColdDB pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + // Check the cache. + if let Some(blobs) = self.block_cache.lock().get_blobs(block_root) { + metrics::inc_counter(&metrics::BEACON_BLOBS_CACHE_HIT_COUNT); + return Ok(Some(blobs.clone())); + } + match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { Some(ref blobs_bytes) => { let blobs = BlobSidecarList::from_ssz_bytes(blobs_bytes)?; - // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, - // may want to attempt to use one again - self.blob_cache.lock().put(*block_root, blobs.clone()); + self.block_cache + .lock() + .put_blobs(*block_root, blobs.clone()); Ok(Some(blobs)) } None => Ok(None), diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index e6b6e730e4..074c05a9e1 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -7,7 +7,6 @@ //! //! Provides a simple API for storing/retrieving all types that sometimes needs type-hints. See //! tests for implementation examples. -#![allow(dead_code)] #[macro_use] extern crate lazy_static; diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index 72c5e61969..2d901fdd93 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -101,6 +101,10 @@ lazy_static! { "store_beacon_block_cache_hit_total", "Number of hits to the store's block cache" ); + pub static ref BEACON_BLOBS_CACHE_HIT_COUNT: Result = try_create_int_counter( + "store_beacon_blobs_cache_hit_total", + "Number of hits to the store's blob cache" + ); pub static ref BEACON_BLOCK_READ_TIMES: Result = try_create_histogram( "store_beacon_block_read_overhead_seconds", "Overhead on reading a beacon block from the DB (e.g., decoding)" diff --git a/common/eth2/Cargo.toml b/common/eth2/Cargo.toml index d0cb62da29..4f050a01e4 100644 --- a/common/eth2/Cargo.toml +++ b/common/eth2/Cargo.toml @@ -10,6 +10,7 @@ edition = "2021" serde = { version = "1.0.116", features = ["derive"] } serde_json = "1.0.58" ssz_types = "0.5.4" +tree_hash = "0.5.2" types = { path = "../../consensus/types" } reqwest = { version = "0.11.0", features = ["json", "stream"] } lighthouse_network = { path = "../../beacon_node/lighthouse_network" } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 0a61d075d4..5b113cac99 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -4,12 +4,16 @@ use crate::Error as ServerError; use lighthouse_network::{ConnectionDirection, Enr, Multiaddr, PeerConnectionStatus}; use mediatype::{names, MediaType, MediaTypeList}; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::Value; use ssz_derive::Encode; use std::convert::TryFrom; use std::fmt::{self, Display}; use std::str::{from_utf8, FromStr}; use std::time::Duration; +use tree_hash::TreeHash; +use types::beacon_block_body::BuilderKzgCommitments; +use types::builder_bid::BlindedBlobsBundle; pub use types::*; #[cfg(feature = "lighthouse")] @@ -1703,3 +1707,100 @@ impl> ForkVersionDeserialize }) } } + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode)] +#[serde(untagged)] +#[serde(bound = "E: EthSpec")] +#[ssz(enum_behaviour = "transparent")] +pub enum FullPayloadContents { + Payload(ExecutionPayload), + PayloadAndBlobs(ExecutionPayloadAndBlobs), +} + +impl FullPayloadContents { + pub fn new( + execution_payload: ExecutionPayload, + maybe_blobs: Option>, + ) -> Self { + match maybe_blobs { + None => Self::Payload(execution_payload), + Some(blobs_bundle) => Self::PayloadAndBlobs(ExecutionPayloadAndBlobs { + execution_payload, + blobs_bundle, + }), + } + } + + pub fn payload_ref(&self) -> &ExecutionPayload { + match self { + FullPayloadContents::Payload(payload) => payload, + FullPayloadContents::PayloadAndBlobs(payload_and_blobs) => { + &payload_and_blobs.execution_payload + } + } + } + + pub fn block_hash(&self) -> ExecutionBlockHash { + self.payload_ref().block_hash() + } + + pub fn deconstruct(self) -> (ExecutionPayload, Option>) { + match self { + FullPayloadContents::Payload(payload) => (payload, None), + FullPayloadContents::PayloadAndBlobs(payload_and_blobs) => ( + payload_and_blobs.execution_payload, + Some(payload_and_blobs.blobs_bundle), + ), + } + } +} + +impl ForkVersionDeserialize for FullPayloadContents { + fn deserialize_by_fork<'de, D: Deserializer<'de>>( + value: Value, + fork_name: ForkName, + ) -> Result { + match fork_name { + ForkName::Merge | ForkName::Capella => serde_json::from_value(value) + .map(Self::Payload) + .map_err(serde::de::Error::custom), + ForkName::Deneb => serde_json::from_value(value) + .map(Self::PayloadAndBlobs) + .map_err(serde::de::Error::custom), + ForkName::Base | ForkName::Altair => Err(serde::de::Error::custom(format!( + "FullPayloadContents deserialization for {fork_name} not implemented" + ))), + } + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode)] +#[serde(bound = "E: EthSpec")] +pub struct ExecutionPayloadAndBlobs { + pub execution_payload: ExecutionPayload, + pub blobs_bundle: BlobsBundle, +} + +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, Encode)] +#[serde(bound = "E: EthSpec")] +pub struct BlobsBundle { + pub commitments: BuilderKzgCommitments, + pub proofs: KzgProofs, + #[serde(with = "ssz_types::serde_utils::list_of_hex_fixed_vec")] + pub blobs: BlobsList, +} + +impl Into> for BlobsBundle { + fn into(self) -> BlindedBlobsBundle { + BlindedBlobsBundle { + commitments: self.commitments, + proofs: self.proofs, + blob_roots: self + .blobs + .into_iter() + .map(|blob| blob.tree_hash_root()) + .collect::>() + .into(), + } + } +} diff --git a/common/eth2_network_config/built_in_network_configs/deneb/boot_enr.yaml b/common/eth2_network_config/built_in_network_configs/deneb/boot_enr.yaml deleted file mode 100644 index b4947f4288..0000000000 --- a/common/eth2_network_config/built_in_network_configs/deneb/boot_enr.yaml +++ /dev/null @@ -1 +0,0 @@ -- enr:-Iq4QAw-ZQb0IiosZgDDcK5ehLs1XmwT0BWU1E1W3ZnhlAAwAE3I46dgCsCbeB5QUwcpDmpFfveTfKF7-tiIg0KWGjqGAYXoIfe6gmlkgnY0gmlwhKEjXcqJc2VjcDI1NmsxoQN4HpB2GMFY2MzwO9hGFjqRG47OX4hGDliAG-mJNWkEr4N1ZHCCIyk diff --git a/common/eth2_network_config/built_in_network_configs/deneb/config.yaml b/common/eth2_network_config/built_in_network_configs/deneb/config.yaml deleted file mode 100644 index 350a06728b..0000000000 --- a/common/eth2_network_config/built_in_network_configs/deneb/config.yaml +++ /dev/null @@ -1,76 +0,0 @@ -# Extends the mainnet preset -PRESET_BASE: 'mainnet' -CONFIG_NAME: 'deneb' # needs to exist because of Prysm. Otherwise it conflicts with mainnet genesis and needs to match configuration in common_eth2_config/src/lib.rs to pass lh ci. - -# Genesis -# --------------------------------------------------------------- -# `2**14` (= 16,384) -MIN_GENESIS_ACTIVE_VALIDATOR_COUNT: 9000 -# Mar-01-2021 08:53:32 AM +UTC -# This is an invalid valid and should be updated when you create the genesis -MIN_GENESIS_TIME: 1674639000 -GENESIS_FORK_VERSION: 0x10484404 -GENESIS_DELAY: 120 - - -# Forking -# --------------------------------------------------------------- -# Some forks are disabled for now: -# - These may be re-assigned to another fork-version later -# - Temporarily set to max uint64 value: 2**64 - 1 - -# Altair -ALTAIR_FORK_VERSION: 0x20484404 -ALTAIR_FORK_EPOCH: 0 -# Merge -BELLATRIX_FORK_VERSION: 0x30484404 -BELLATRIX_FORK_EPOCH: 0 -TERMINAL_TOTAL_DIFFICULTY: 0 -TERMINAL_BLOCK_HASH: 0x0000000000000000000000000000000000000000000000000000000000000000 -TERMINAL_BLOCK_HASH_ACTIVATION_EPOCH: 18446744073709551615 - -# Capella -CAPELLA_FORK_VERSION: 0x40484404 -CAPELLA_FORK_EPOCH: 1 - -# DENEB/Deneb -DENEB_FORK_VERSION: 0x50484404 -DENEB_FORK_EPOCH: 5 - -# Time parameters -# --------------------------------------------------------------- -# 12 seconds -SECONDS_PER_SLOT: 12 -# 14 (estimate from Eth1 mainnet) -SECONDS_PER_ETH1_BLOCK: 12 -# 2**0 (= 1) epochs ~1 hours -MIN_VALIDATOR_WITHDRAWABILITY_DELAY: 1 -# 2**8 (= 256) epochs ~27 hours -SHARD_COMMITTEE_PERIOD: 1 -# 2**11 (= 2,048) Eth1 blocks ~8 hours -ETH1_FOLLOW_DISTANCE: 12 - - -# Validator cycle -# --------------------------------------------------------------- -# 2**2 (= 4) -INACTIVITY_SCORE_BIAS: 4 -# 2**4 (= 16) -INACTIVITY_SCORE_RECOVERY_RATE: 16 -# 2**4 * 10**9 (= 16,000,000,000) Gwei -EJECTION_BALANCE: 31000000000 -# 2**2 (= 4) -MIN_PER_EPOCH_CHURN_LIMIT: 4 -# 2**16 (= 65,536) -CHURN_LIMIT_QUOTIENT: 65536 - -# Fork choice -# --------------------------------------------------------------- -# 40% -PROPOSER_SCORE_BOOST: 40 - -# Deposit contract -# --------------------------------------------------------------- -DEPOSIT_CHAIN_ID: 4844001004 -DEPOSIT_NETWORK_ID: 4844001004 -DEPOSIT_CONTRACT_ADDRESS: 0x4242424242424242424242424242424242424242 diff --git a/common/eth2_network_config/built_in_network_configs/deneb/genesis.ssz.zip b/common/eth2_network_config/built_in_network_configs/deneb/genesis.ssz.zip deleted file mode 100644 index 0df154bbd7..0000000000 Binary files a/common/eth2_network_config/built_in_network_configs/deneb/genesis.ssz.zip and /dev/null differ diff --git a/consensus/serde_utils/src/u256_hex_be_opt.rs b/consensus/serde_utils/src/u256_hex_be_opt.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/consensus/state_processing/src/per_block_processing/deneb.rs b/consensus/state_processing/src/per_block_processing/deneb.rs index 68d53da9d7..8f7cb0514f 100644 --- a/consensus/state_processing/src/per_block_processing/deneb.rs +++ b/consensus/state_processing/src/per_block_processing/deneb.rs @@ -1,2 +1,9 @@ -#[allow(clippy::module_inception)] -pub mod deneb; +use ethereum_hashing::hash_fixed; +use types::consts::deneb::VERSIONED_HASH_VERSION_KZG; +use types::{KzgCommitment, VersionedHash}; + +pub fn kzg_commitment_to_versioned_hash(kzg_commitment: &KzgCommitment) -> VersionedHash { + let mut hashed_commitment = hash_fixed(&kzg_commitment.0); + hashed_commitment[0] = VERSIONED_HASH_VERSION_KZG; + VersionedHash::from(hashed_commitment) +} diff --git a/consensus/state_processing/src/per_block_processing/deneb/deneb.rs b/consensus/state_processing/src/per_block_processing/deneb/deneb.rs deleted file mode 100644 index 8f7cb0514f..0000000000 --- a/consensus/state_processing/src/per_block_processing/deneb/deneb.rs +++ /dev/null @@ -1,9 +0,0 @@ -use ethereum_hashing::hash_fixed; -use types::consts::deneb::VERSIONED_HASH_VERSION_KZG; -use types::{KzgCommitment, VersionedHash}; - -pub fn kzg_commitment_to_versioned_hash(kzg_commitment: &KzgCommitment) -> VersionedHash { - let mut hashed_commitment = hash_fixed(&kzg_commitment.0); - hashed_commitment[0] = VERSIONED_HASH_VERSION_KZG; - VersionedHash::from(hashed_commitment) -} diff --git a/consensus/state_processing/src/per_block_processing/errors.rs b/consensus/state_processing/src/per_block_processing/errors.rs index 016460080d..de1c132951 100644 --- a/consensus/state_processing/src/per_block_processing/errors.rs +++ b/consensus/state_processing/src/per_block_processing/errors.rs @@ -88,18 +88,6 @@ pub enum BlockProcessingError { expected: Hash256, found: Hash256, }, - BlobVersionHashMismatch, - /// The number of commitments in blob transactions in the payload does not match the number - /// of commitments in the block. - BlobNumCommitmentsMismatch { - commitments_processed_in_block: usize, - /// This number depic - commitments_processed_in_transactions: usize, - }, - BlobVersionHashIndexOutOfBounds { - index: usize, - length: usize, - }, WithdrawalCredentialsInvalid, ParticipationCacheError(ParticipationCacheError), } diff --git a/consensus/types/src/beacon_block_body.rs b/consensus/types/src/beacon_block_body.rs index 6c31e8cc1a..decccb1fbf 100644 --- a/consensus/types/src/beacon_block_body.rs +++ b/consensus/types/src/beacon_block_body.rs @@ -9,19 +9,21 @@ use superstruct::superstruct; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; -pub type KzgCommitments = VariableList::MaxBlobsPerBlock>; +//TODO: Remove this type and use `BlockBodyKzgCommitments` everywhere when this PR is merged: +// https://github.com/ethereum/builder-specs/pull/87 +pub type BuilderKzgCommitments = VariableList::MaxBlobsPerBlock>; pub type BlockBodyKzgCommitments = VariableList::MaxBlobCommitmentsPerBlock>; pub fn to_block_kzg_commitments( - commitments: KzgCommitments, + commitments: BuilderKzgCommitments, ) -> BlockBodyKzgCommitments { commitments.to_vec().into() } pub fn from_block_kzg_commitments( commitments: &BlockBodyKzgCommitments, -) -> KzgCommitments { +) -> BuilderKzgCommitments { commitments.to_vec().into() } diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index 4e29e7cd69..f19068edfb 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -1,112 +1,21 @@ +use crate::test_utils::TestRandom; +use crate::{Blob, ChainSpec, Domain, EthSpec, Fork, Hash256, SignedBlobSidecar, SignedRoot, Slot}; +use bls::SecretKey; +use derivative::Derivative; +use kzg::{Kzg, KzgCommitment, KzgPreset, KzgProof}; +use rand::Rng; +use serde_derive::{Deserialize, Serialize}; +use ssz::Encode; +use ssz_derive::{Decode, Encode}; +use ssz_types::{FixedVector, VariableList}; use std::fmt::Debug; use std::hash::Hash; use std::marker::PhantomData; use std::sync::Arc; - -use derivative::Derivative; -use kzg::{Kzg, KzgCommitment, KzgPreset, KzgProof}; -use rand::Rng; -use serde::de::DeserializeOwned; -use serde_derive::{Deserialize, Serialize}; -use ssz::{Decode, Encode}; -use ssz_derive::{Decode, Encode}; -use ssz_types::{FixedVector, VariableList}; +use test_random_derive::TestRandom; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; -use bls::SecretKey; -use test_random_derive::TestRandom; - -use crate::beacon_block_body::KzgCommitments; -use crate::test_utils::TestRandom; -use crate::{ - AbstractExecPayload, BeaconBlock, Blob, ChainSpec, Domain, EthSpec, Fork, Hash256, - SignedBlobSidecar, SignedRoot, Slot, -}; - -pub trait Sidecar: - serde::Serialize - + Clone - + DeserializeOwned - + Encode - + Decode - + Hash - + TreeHash - + TestRandom - + Debug - + SignedRoot - + Sync - + Send - + for<'a> arbitrary::Arbitrary<'a> -{ - type BlobItems: BlobItems; - fn slot(&self) -> Slot; - fn build_sidecar>( - blob_items: Self::BlobItems, - block: &BeaconBlock, - expected_kzg_commitments: &KzgCommitments, - kzg_proofs: Vec, - ) -> Result, String>; -} - -pub trait BlobItems: Sync + Send + Sized { - fn try_from_blob_roots(roots: BlobRootsList) -> Result; - fn try_from_blobs(blobs: BlobsList) -> Result; - fn len(&self) -> usize; - fn is_empty(&self) -> bool; - fn blobs(&self) -> Option<&BlobsList>; -} - -impl BlobItems for BlobsList { - fn try_from_blob_roots(_roots: BlobRootsList) -> Result { - Err("Unexpected conversion from blob roots to blobs".to_string()) - } - - fn try_from_blobs(blobs: BlobsList) -> Result { - Ok(blobs) - } - - fn len(&self) -> usize { - VariableList::len(self) - } - - fn is_empty(&self) -> bool { - VariableList::is_empty(self) - } - - fn blobs(&self) -> Option<&BlobsList> { - Some(self) - } -} - -impl BlobItems for BlobRootsList { - fn try_from_blob_roots(roots: BlobRootsList) -> Result { - Ok(roots) - } - - fn try_from_blobs(blobs: BlobsList) -> Result { - VariableList::new( - blobs - .into_iter() - .map(|blob| blob.tree_hash_root()) - .collect(), - ) - .map_err(|e| format!("{e:?}")) - } - - fn len(&self) -> usize { - VariableList::len(self) - } - - fn is_empty(&self) -> bool { - VariableList::is_empty(self) - } - - fn blobs(&self) -> Option<&BlobsList> { - None - } -} - /// Container of the data that identifies an individual blob. #[derive( Serialize, Deserialize, Encode, Decode, TreeHash, Copy, Clone, Debug, PartialEq, Eq, Hash, @@ -158,52 +67,6 @@ pub struct BlobSidecar { pub kzg_proof: KzgProof, } -impl Sidecar for BlobSidecar { - type BlobItems = BlobsList; - - fn slot(&self) -> Slot { - self.slot - } - - fn build_sidecar>( - blobs: BlobsList, - block: &BeaconBlock, - expected_kzg_commitments: &KzgCommitments, - kzg_proofs: Vec, - ) -> Result, String> { - let beacon_block_root = block.canonical_root(); - let slot = block.slot(); - let blob_sidecars = BlobSidecarList::from( - blobs - .into_iter() - .enumerate() - .map(|(blob_index, blob)| { - let kzg_commitment = expected_kzg_commitments - .get(blob_index) - .ok_or("KZG commitment should exist for blob")?; - - let kzg_proof = kzg_proofs - .get(blob_index) - .ok_or("KZG proof should exist for blob")?; - - Ok(Arc::new(BlobSidecar { - block_root: beacon_block_root, - index: blob_index as u64, - slot, - block_parent_root: block.parent_root(), - proposer_index: block.proposer_index(), - blob, - kzg_commitment: *kzg_commitment, - kzg_proof: *kzg_proof, - })) - }) - .collect::, String>>()?, - ); - - Ok(blob_sidecars) - } -} - impl From>> for BlindedBlobSidecar { fn from(blob_sidecar: Arc>) -> Self { BlindedBlobSidecar { @@ -353,54 +216,6 @@ pub struct BlindedBlobSidecar { impl SignedRoot for BlindedBlobSidecar {} -impl Sidecar for BlindedBlobSidecar { - type BlobItems = BlobRootsList; - - fn slot(&self) -> Slot { - self.slot - } - - fn build_sidecar>( - blob_roots: BlobRootsList, - block: &BeaconBlock, - expected_kzg_commitments: &KzgCommitments, - kzg_proofs: Vec, - ) -> Result, String> { - let beacon_block_root = block.canonical_root(); - let slot = block.slot(); - - let blob_sidecars = BlindedBlobSidecarList::::from( - blob_roots - .into_iter() - .enumerate() - .map(|(blob_index, blob_root)| { - let kzg_commitment = expected_kzg_commitments - .get(blob_index) - .ok_or("KZG commitment should exist for blob")?; - - let kzg_proof = kzg_proofs.get(blob_index).ok_or(format!( - "Missing KZG proof for slot {} blob index: {}", - slot, blob_index - ))?; - - Ok(Arc::new(BlindedBlobSidecar { - block_root: beacon_block_root, - index: blob_index as u64, - slot, - block_parent_root: block.parent_root(), - proposer_index: block.proposer_index(), - blob_root, - kzg_commitment: *kzg_commitment, - kzg_proof: *kzg_proof, - })) - }) - .collect::, String>>()?, - ); - - Ok(blob_sidecars) - } -} - pub type SidecarList = VariableList, ::MaxBlobsPerBlock>; pub type BlobSidecarList = SidecarList>; pub type BlindedBlobSidecarList = SidecarList; diff --git a/consensus/types/src/builder_bid.rs b/consensus/types/src/builder_bid.rs index bc358dc17f..cdd240716d 100644 --- a/consensus/types/src/builder_bid.rs +++ b/consensus/types/src/builder_bid.rs @@ -1,8 +1,8 @@ -use crate::beacon_block_body::KzgCommitments; +use crate::beacon_block_body::BuilderKzgCommitments; use crate::{ - BlobRootsList, BlobsBundle, ChainSpec, EthSpec, ExecutionPayloadHeaderCapella, - ExecutionPayloadHeaderDeneb, ExecutionPayloadHeaderMerge, ExecutionPayloadHeaderRef, ForkName, - ForkVersionDeserialize, KzgProofs, SignedRoot, Uint256, + BlobRootsList, ChainSpec, EthSpec, ExecutionPayloadHeaderCapella, ExecutionPayloadHeaderDeneb, + ExecutionPayloadHeaderMerge, ExecutionPayloadHeaderRef, ForkName, ForkVersionDeserialize, + KzgProofs, SignedRoot, Uint256, }; use bls::PublicKeyBytes; use bls::Signature; @@ -10,32 +10,16 @@ use serde::Deserializer; use serde_derive::{Deserialize, Serialize}; use ssz_derive::Encode; use superstruct::superstruct; -use tree_hash::TreeHash; use tree_hash_derive::TreeHash; #[derive(PartialEq, Debug, Default, Serialize, Deserialize, TreeHash, Clone, Encode)] #[serde(bound = "E: EthSpec")] pub struct BlindedBlobsBundle { - pub commitments: KzgCommitments, + pub commitments: BuilderKzgCommitments, pub proofs: KzgProofs, pub blob_roots: BlobRootsList, } -impl From> for BlindedBlobsBundle { - fn from(blobs_bundle: BlobsBundle) -> Self { - BlindedBlobsBundle { - commitments: blobs_bundle.commitments, - proofs: blobs_bundle.proofs, - blob_roots: blobs_bundle - .blobs - .into_iter() - .map(|blob| blob.tree_hash_root()) - .collect::>() - .into(), - } - } -} - #[superstruct( variants(Merge, Capella, Deneb), variant_attributes( diff --git a/consensus/types/src/consts.rs b/consensus/types/src/consts.rs index cdeff2cbd5..7fa03dc5f8 100644 --- a/consensus/types/src/consts.rs +++ b/consensus/types/src/consts.rs @@ -36,4 +36,5 @@ pub mod deneb { } pub const VERSIONED_HASH_VERSION_KZG: u8 = 1; pub const BLOB_SIDECAR_SUBNET_COUNT: u64 = 6; + pub const MAX_BLOBS_PER_BLOCK: u64 = BLOB_SIDECAR_SUBNET_COUNT; } diff --git a/consensus/types/src/execution_block_header.rs b/consensus/types/src/execution_block_header.rs index 5ec5484cab..945222a925 100644 --- a/consensus/types/src/execution_block_header.rs +++ b/consensus/types/src/execution_block_header.rs @@ -24,7 +24,7 @@ use metastruct::metastruct; /// /// Credit to Reth for the type definition. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -#[metastruct(mappings(map_execution_block_header_fields_except_withdrawals(exclude( +#[metastruct(mappings(map_execution_block_header_fields_base(exclude( withdrawals_root, blob_gas_used, excess_blob_gas, diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 5008bdf632..470e6acc18 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -99,6 +99,7 @@ pub mod slot_data; pub mod sqlite; pub mod blob_sidecar; +pub mod sidecar; pub mod signed_blob; use ethereum_types::{H160, H256}; @@ -121,7 +122,7 @@ pub use crate::beacon_committee::{BeaconCommittee, OwnedBeaconCommittee}; pub use crate::beacon_state::{BeaconTreeHashCache, Error as BeaconStateError, *}; pub use crate::blob_sidecar::{ BlindedBlobSidecar, BlindedBlobSidecarList, BlobRootsList, BlobSidecar, BlobSidecarList, - BlobsList, Sidecar, SidecarList, + BlobsList, SidecarList, }; pub use crate::bls_to_execution_change::BlsToExecutionChange; pub use crate::chain_spec::{ChainSpec, Config, Domain}; @@ -161,9 +162,8 @@ pub use crate::participation_flags::ParticipationFlags; pub use crate::participation_list::ParticipationList; pub use crate::payload::{ AbstractExecPayload, BlindedPayload, BlindedPayloadCapella, BlindedPayloadDeneb, - BlindedPayloadMerge, BlindedPayloadRef, BlobsBundle, BlockType, ExecPayload, - ExecutionPayloadAndBlobs, FullPayload, FullPayloadCapella, FullPayloadContents, - FullPayloadDeneb, FullPayloadMerge, FullPayloadRef, OwnedExecPayload, + BlindedPayloadMerge, BlindedPayloadRef, BlockType, ExecPayload, FullPayload, + FullPayloadCapella, FullPayloadDeneb, FullPayloadMerge, FullPayloadRef, OwnedExecPayload, }; pub use crate::pending_attestation::PendingAttestation; pub use crate::preset::{AltairPreset, BasePreset, BellatrixPreset, CapellaPreset}; @@ -221,5 +221,6 @@ pub use bls::{ pub use kzg::{KzgCommitment, KzgProof}; +pub use sidecar::Sidecar; pub use ssz_types::{typenum, typenum::Unsigned, BitList, BitVector, FixedVector, VariableList}; pub use superstruct::superstruct; diff --git a/consensus/types/src/payload.rs b/consensus/types/src/payload.rs index a1b513d3fa..f89e325986 100644 --- a/consensus/types/src/payload.rs +++ b/consensus/types/src/payload.rs @@ -1,9 +1,7 @@ -use crate::beacon_block_body::KzgCommitments; use crate::{test_utils::TestRandom, *}; use derivative::Derivative; use serde::de::DeserializeOwned; -use serde::{Deserialize, Deserializer, Serialize}; -use serde_json::Value; +use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::borrow::Cow; @@ -971,85 +969,3 @@ impl From> for ExecutionPayloadHeader { } } } - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode)] -#[serde(untagged)] -#[serde(bound = "E: EthSpec")] -#[ssz(enum_behaviour = "transparent")] -pub enum FullPayloadContents { - Payload(ExecutionPayload), - PayloadAndBlobs(ExecutionPayloadAndBlobs), -} - -impl FullPayloadContents { - pub fn new( - execution_payload: ExecutionPayload, - maybe_blobs: Option>, - ) -> Self { - match maybe_blobs { - None => Self::Payload(execution_payload), - Some(blobs_bundle) => Self::PayloadAndBlobs(ExecutionPayloadAndBlobs { - execution_payload, - blobs_bundle, - }), - } - } - - pub fn payload_ref(&self) -> &ExecutionPayload { - match self { - FullPayloadContents::Payload(payload) => payload, - FullPayloadContents::PayloadAndBlobs(payload_and_blobs) => { - &payload_and_blobs.execution_payload - } - } - } - - pub fn block_hash(&self) -> ExecutionBlockHash { - self.payload_ref().block_hash() - } - - pub fn deconstruct(self) -> (ExecutionPayload, Option>) { - match self { - FullPayloadContents::Payload(payload) => (payload, None), - FullPayloadContents::PayloadAndBlobs(payload_and_blobs) => ( - payload_and_blobs.execution_payload, - Some(payload_and_blobs.blobs_bundle), - ), - } - } -} - -impl ForkVersionDeserialize for FullPayloadContents { - fn deserialize_by_fork<'de, D: Deserializer<'de>>( - value: Value, - fork_name: ForkName, - ) -> Result { - match fork_name { - ForkName::Merge | ForkName::Capella => serde_json::from_value(value) - .map(Self::Payload) - .map_err(serde::de::Error::custom), - ForkName::Deneb => serde_json::from_value(value) - .map(Self::PayloadAndBlobs) - .map_err(serde::de::Error::custom), - ForkName::Base | ForkName::Altair => Err(serde::de::Error::custom(format!( - "FullPayloadContents deserialization for {fork_name} not implemented" - ))), - } - } -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode)] -#[serde(bound = "E: EthSpec")] -pub struct ExecutionPayloadAndBlobs { - pub execution_payload: ExecutionPayload, - pub blobs_bundle: BlobsBundle, -} - -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, Encode)] -#[serde(bound = "E: EthSpec")] -pub struct BlobsBundle { - pub commitments: KzgCommitments, - pub proofs: KzgProofs, - #[serde(with = "ssz_types::serde_utils::list_of_hex_fixed_vec")] - pub blobs: BlobsList, -} diff --git a/consensus/types/src/sidecar.rs b/consensus/types/src/sidecar.rs new file mode 100644 index 0000000000..7feb85ce8f --- /dev/null +++ b/consensus/types/src/sidecar.rs @@ -0,0 +1,191 @@ +use crate::beacon_block_body::BuilderKzgCommitments; +use crate::test_utils::TestRandom; +use crate::{ + AbstractExecPayload, BeaconBlock, BlindedBlobSidecar, BlindedBlobSidecarList, BlobRootsList, + BlobSidecar, BlobSidecarList, BlobsList, EthSpec, SidecarList, SignedRoot, Slot, +}; +use kzg::KzgProof; +use serde::de::DeserializeOwned; +use ssz::{Decode, Encode}; +use ssz_types::VariableList; +use std::fmt::Debug; +use std::hash::Hash; +use std::sync::Arc; +use tree_hash::TreeHash; + +pub trait Sidecar: + serde::Serialize + + Clone + + DeserializeOwned + + Encode + + Decode + + Hash + + TreeHash + + TestRandom + + Debug + + SignedRoot + + Sync + + Send + + for<'a> arbitrary::Arbitrary<'a> +{ + type BlobItems: BlobItems; + fn slot(&self) -> Slot; + fn build_sidecar>( + blob_items: Self::BlobItems, + block: &BeaconBlock, + expected_kzg_commitments: &BuilderKzgCommitments, + kzg_proofs: Vec, + ) -> Result, String>; +} + +pub trait BlobItems: Sync + Send + Sized { + fn try_from_blob_roots(roots: BlobRootsList) -> Result; + fn try_from_blobs(blobs: BlobsList) -> Result; + fn len(&self) -> usize; + fn is_empty(&self) -> bool; + fn blobs(&self) -> Option<&BlobsList>; +} + +impl BlobItems for BlobsList { + fn try_from_blob_roots(_roots: BlobRootsList) -> Result { + Err("Unexpected conversion from blob roots to blobs".to_string()) + } + + fn try_from_blobs(blobs: BlobsList) -> Result { + Ok(blobs) + } + + fn len(&self) -> usize { + VariableList::len(self) + } + + fn is_empty(&self) -> bool { + VariableList::is_empty(self) + } + + fn blobs(&self) -> Option<&BlobsList> { + Some(self) + } +} + +impl BlobItems for BlobRootsList { + fn try_from_blob_roots(roots: BlobRootsList) -> Result { + Ok(roots) + } + + fn try_from_blobs(blobs: BlobsList) -> Result { + VariableList::new( + blobs + .into_iter() + .map(|blob| blob.tree_hash_root()) + .collect(), + ) + .map_err(|e| format!("{e:?}")) + } + + fn len(&self) -> usize { + VariableList::len(self) + } + + fn is_empty(&self) -> bool { + VariableList::is_empty(self) + } + + fn blobs(&self) -> Option<&BlobsList> { + None + } +} + +impl Sidecar for BlobSidecar { + type BlobItems = BlobsList; + + fn slot(&self) -> Slot { + self.slot + } + + fn build_sidecar>( + blobs: BlobsList, + block: &BeaconBlock, + expected_kzg_commitments: &BuilderKzgCommitments, + kzg_proofs: Vec, + ) -> Result, String> { + let beacon_block_root = block.canonical_root(); + let slot = block.slot(); + let blob_sidecars = BlobSidecarList::from( + blobs + .into_iter() + .enumerate() + .map(|(blob_index, blob)| { + let kzg_commitment = expected_kzg_commitments + .get(blob_index) + .ok_or("KZG commitment should exist for blob")?; + + let kzg_proof = kzg_proofs + .get(blob_index) + .ok_or("KZG proof should exist for blob")?; + + Ok(Arc::new(BlobSidecar { + block_root: beacon_block_root, + index: blob_index as u64, + slot, + block_parent_root: block.parent_root(), + proposer_index: block.proposer_index(), + blob, + kzg_commitment: *kzg_commitment, + kzg_proof: *kzg_proof, + })) + }) + .collect::, String>>()?, + ); + + Ok(blob_sidecars) + } +} + +impl Sidecar for BlindedBlobSidecar { + type BlobItems = BlobRootsList; + + fn slot(&self) -> Slot { + self.slot + } + + fn build_sidecar>( + blob_roots: BlobRootsList, + block: &BeaconBlock, + expected_kzg_commitments: &BuilderKzgCommitments, + kzg_proofs: Vec, + ) -> Result, String> { + let beacon_block_root = block.canonical_root(); + let slot = block.slot(); + + let blob_sidecars = BlindedBlobSidecarList::::from( + blob_roots + .into_iter() + .enumerate() + .map(|(blob_index, blob_root)| { + let kzg_commitment = expected_kzg_commitments + .get(blob_index) + .ok_or("KZG commitment should exist for blob")?; + + let kzg_proof = kzg_proofs.get(blob_index).ok_or(format!( + "Missing KZG proof for slot {} blob index: {}", + slot, blob_index + ))?; + + Ok(Arc::new(BlindedBlobSidecar { + block_root: beacon_block_root, + index: blob_index as u64, + slot, + block_parent_root: block.parent_root(), + proposer_index: block.proposer_index(), + blob_root, + kzg_commitment: *kzg_commitment, + kzg_proof: *kzg_proof, + })) + }) + .collect::, String>>()?, + ); + + Ok(blob_sidecars) + } +} diff --git a/consensus/types/src/signed_blob.rs b/consensus/types/src/signed_blob.rs index ff3daa29ac..e960558a3e 100644 --- a/consensus/types/src/signed_blob.rs +++ b/consensus/types/src/signed_blob.rs @@ -1,6 +1,7 @@ +use crate::sidecar::Sidecar; use crate::{ test_utils::TestRandom, BlindedBlobSidecar, Blob, BlobSidecar, ChainSpec, Domain, EthSpec, - Fork, Hash256, Sidecar, Signature, SignedRoot, SigningData, + Fork, Hash256, Signature, SignedRoot, SigningData, }; use bls::PublicKey; use derivative::Derivative; diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 2428fa5262..184ba69438 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -107,6 +107,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .arg( Arg::with_name("blob-prune-margin-epochs") .long("blob-prune-margin-epochs") + .value_name("EPOCHS") .help( "The margin for blob pruning in epochs. The oldest blobs are pruned \ up until data_availability_boundary - blob_prune_margin_epochs.", diff --git a/testing/ef_tests/check_all_files_accessed.py b/testing/ef_tests/check_all_files_accessed.py index 7c30029f38..a5ab897c33 100755 --- a/testing/ef_tests/check_all_files_accessed.py +++ b/testing/ef_tests/check_all_files_accessed.py @@ -51,9 +51,6 @@ excluded_paths = [ "bls12-381-tests/deserialization_G1", "bls12-381-tests/deserialization_G2", "bls12-381-tests/hash_to_G2", - # FIXME(sean) - "tests/mainnet/capella/light_client/single_merkle_proof/BeaconBlockBody/*", - "tests/mainnet/deneb/light_client/single_merkle_proof/BeaconBlockBody/*", "tests/.*/eip6110" ] diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index ffe7fd0832..612dd96bcd 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -18,11 +18,12 @@ use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use task_executor::TaskExecutor; +use types::sidecar::Sidecar; use types::{ attestation::Error as AttestationError, graffiti::GraffitiString, AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, Fork, ForkName, Graffiti, Hash256, Keypair, PublicKeyBytes, - SelectionProof, Sidecar, SidecarList, Signature, SignedAggregateAndProof, SignedBeaconBlock, + SelectionProof, SidecarList, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedRoot, SignedSidecar, SignedSidecarList, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId,