diff --git a/Cargo.lock b/Cargo.lock index 0e669154f5..e7d51d494d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5094,7 +5094,7 @@ dependencies = [ "task_executor", "tokio", "tokio-stream", - "tokio-util 0.6.10", + "tokio-util 0.7.7", "types", ] @@ -8021,6 +8021,7 @@ dependencies = [ "futures-io", "futures-sink", "pin-project-lite 0.2.9", + "slab", "tokio", "tracing", ] diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e324a80ef4..5aab4cde9a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -8,7 +8,8 @@ use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::blob_cache::BlobCache; use crate::blob_verification::{ - AsBlock, AvailableBlock, BlobError, Blobs, BlockWrapper, IntoAvailableBlock, + self, AsBlock, AvailableBlock, BlobError, BlockWrapper, GossipVerifiedBlob, IntoAvailableBlock, + VerifiedBlobs, }; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::{ @@ -25,6 +26,7 @@ use crate::eth1_finalization_cache::{Eth1FinalizationCache, Eth1FinalizationData use crate::events::ServerSentEventHandler; use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle}; use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; +use crate::gossip_blob_cache::{Availability, AvailabilityCheckError, DataAvailabilityChecker}; use crate::head_tracker::HeadTracker; use crate::historical_blocks::HistoricalBlockError; use crate::kzg_utils; @@ -76,6 +78,7 @@ use futures::channel::mpsc::Sender; use itertools::process_results; use itertools::Itertools; use kzg::Kzg; +use oneshot_broadcast::Receiver; use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella}; use parking_lot::{Mutex, RwLock}; use proto_array::{CountUnrealizedFull, DoNotReOrg, ProposerHeadError}; @@ -112,7 +115,9 @@ use store::{ use task_executor::{ShutdownReason, TaskExecutor}; use tokio::task::JoinHandle; use tree_hash::TreeHash; +use types::beacon_block_body::KzgCommitments; use types::beacon_state::CloneConfig; +use types::blob_sidecar::{BlobIdentifier, BlobSidecarArcList, Blobs}; use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; use types::consts::merge::INTERVALS_PER_SLOT; use types::*; @@ -185,6 +190,25 @@ pub enum WhenSlotSkipped { Prev, } +#[derive(Debug)] +pub enum AvailabilityProcessingStatus { + PendingBlobs(Vec), + PendingBlock(Hash256), + Imported(Hash256), +} + +//TODO(sean) using this in tests for now +impl TryInto for AvailabilityProcessingStatus { + type Error = (); + + fn try_into(self) -> Result { + match self { + AvailabilityProcessingStatus::Imported(hash) => Ok(hash.into()), + _ => Err(()), + } + } +} + /// The result of a chain segment processing. pub enum ChainSegmentResult { /// Processing this chain segment finished successfully. @@ -274,11 +298,6 @@ pub enum StateSkipConfig { WithoutStateRoots, } -pub enum BlockProcessingResult { - Verified(Hash256), - AvailabilityPending(ExecutedBlock), -} - pub trait BeaconChainTypes: Send + Sync + 'static { type HotStore: store::ItemStore; type ColdStore: store::ItemStore; @@ -440,7 +459,8 @@ pub struct BeaconChain { pub slasher: Option>>, /// Provides monitoring of a set of explicitly defined validators. pub validator_monitor: RwLock>, - pub blob_cache: BlobCache, + pub proposal_blob_cache: BlobCache, + pub data_availability_checker: DataAvailabilityChecker, pub kzg: Option>, } @@ -971,35 +991,9 @@ impl BeaconChain { pub async fn get_block_and_blobs_checking_early_attester_cache( &self, block_root: &Hash256, - ) -> Result>, Error> { - // If there is no data availability boundary, the Eip4844 fork is disabled. - if let Some(finalized_data_availability_boundary) = - self.finalized_data_availability_boundary() - { - // Only use the attester cache if we can find both the block and blob - if let (Some(block), Some(blobs)) = ( - self.early_attester_cache.get_block(*block_root), - self.early_attester_cache.get_blobs(*block_root), - ) { - Ok(Some(SignedBeaconBlockAndBlobsSidecar { - beacon_block: block, - blobs_sidecar: blobs, - })) - // Attempt to get the block and blobs from the database - } else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) { - let blobs = self - .get_blobs(block_root, finalized_data_availability_boundary)? - .map(Arc::new); - Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar { - beacon_block: block, - blobs_sidecar: blobs, - })) - } else { - Ok(None) - } - } else { - Ok(None) - } + ) -> Result, Error> { + //TODO(sean) use the rpc blobs cache and revert this to the current block cache logic + Ok(Some(())) } /// Returns the block at the given root, if any. @@ -1082,8 +1076,8 @@ impl BeaconChain { pub fn get_blobs( &self, block_root: &Hash256, - ) -> Result>, Error> { - self.store.get_blobs(block_root) + ) -> Result>, Error> { + Ok(self.store.get_blobs(block_root)?) } pub fn get_blinded_block( @@ -1905,6 +1899,15 @@ impl BeaconChain { }) } + pub fn verify_blob_sidecar_for_gossip( + self: &Arc, + blob_sidecar: SignedBlobSidecar, + subnet_id: u64, + ) -> Result, BlobError> // TODO(pawan): make a GossipVerifedBlob type + { + blob_verification::validate_blob_sidecar_for_gossip(blob_sidecar, subnet_id, self) + } + /// Accepts some 'LightClientOptimisticUpdate' from the network and attempts to verify it pub fn verify_optimistic_update_for_gossip( self: &Arc, @@ -2656,13 +2659,29 @@ impl BeaconChain { .map_err(BeaconChainError::TokioJoin)? } + pub async fn process_blob( + self: &Arc, + blob: BlobSidecar, + count_unrealized: CountUnrealized, + ) -> Result> { + self.check_availability_and_maybe_import( + |chain| chain.data_availability_checker.put_blob(Arc::new(blob)), + count_unrealized, + ) + .await + } + /// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and /// imported into the chain. /// + /// For post deneb blocks, this returns a `BlockError::AvailabilityPending` error + /// if the corresponding blobs are not in the required caches. + /// /// Items that implement `IntoExecutionPendingBlock` include: /// /// - `SignedBeaconBlock` /// - `GossipVerifiedBlock` + /// - `BlockWrapper` /// /// ## Errors /// @@ -2674,14 +2693,13 @@ impl BeaconChain { unverified_block: B, count_unrealized: CountUnrealized, notify_execution_layer: NotifyExecutionLayer, - ) -> Result, BlockError> { + ) -> Result> { // Start the Prometheus timer. let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); // Increment the Prometheus counter for block processing requests. metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS); - let slot = unverified_block.block().slot(); let chain = self.clone(); let execution_pending = unverified_block.into_execution_pending_block( @@ -2692,65 +2710,20 @@ impl BeaconChain { // TODO(log required errors) let executed_block = self + .clone() .into_executed_block(execution_pending, count_unrealized) - .await?; + .await + .map_err(|e| self.handle_block_error(e))?; - // Check if the executed block has all it's blobs available to qualify as a fully - // available block - let import_block = if let Ok(blobs) = self - .gossip_blob_cache - .lock() - .blobs(executed_block.block_root) - { - self.import_available_block(executed_block, blobs, count_unrealized) - } else { - return Ok(BlockProcessingResult::AvailabilityPending(executed_block)); - }; - - // Verify and import the block. - match import_block.await { - // The block was successfully verified and imported. Yay. - Ok(block_root) => { - trace!( - self.log, - "Beacon block imported"; - "block_root" => ?block_root, - "block_slot" => slot, - ); - - // Increment the Prometheus counter for block processing successes. - metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); - - Ok(BlockProcessingResult::Verified(block_root)) - } - Err(e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_))) => { - debug!( - self.log, - "Beacon block processing cancelled"; - "error" => ?e, - ); - Err(e) - } - // There was an error whilst attempting to verify and import the block. The block might - // be partially verified or partially imported. - Err(BlockError::BeaconChainError(e)) => { - crit!( - self.log, - "Beacon block processing error"; - "error" => ?e, - ); - Err(BlockError::BeaconChainError(e)) - } - // The block failed verification. - Err(other) => { - trace!( - self.log, - "Beacon block rejected"; - "reason" => other.to_string(), - ); - Err(other) - } - } + self.check_availability_and_maybe_import( + |chain| { + chain + .data_availability_checker + .check_block_availability(executed_block) + }, + count_unrealized, + ) + .await } /// Accepts a fully-verified block and awaits on it's payload verification handle to @@ -2761,7 +2734,7 @@ impl BeaconChain { self: Arc, execution_pending_block: ExecutionPendingBlock, count_unrealized: CountUnrealized, - ) -> Result, BlockError> { + ) -> Result, BlockError> { let ExecutionPendingBlock { block, block_root, @@ -2819,55 +2792,118 @@ impl BeaconChain { }) } + fn handle_block_error(&self, e: BlockError) -> BlockError { + match e { + e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_)) => { + debug!( + self.log, + "Beacon block processing cancelled"; + "error" => ?e, + ); + e + } + BlockError::BeaconChainError(e) => { + crit!( + self.log, + "Beacon block processing error"; + "error" => ?e, + ); + BlockError::BeaconChainError(e) + } + other => { + trace!( + self.log, + "Beacon block rejected"; + "reason" => other.to_string(), + ); + other + } + } + } + /// Accepts a fully-verified, available block and imports it into the chain without performing any /// additional verification. /// /// An error is returned if the block was unable to be imported. It may be partially imported /// (i.e., this function is not atomic). - async fn import_available_block( - self: Arc, - executed_block: ExecutedBlock, - blobs: Blobs, + pub async fn check_availability_and_maybe_import( + self: &Arc, + cache_fn: impl FnOnce(Arc) -> Result, AvailabilityCheckError>, count_unrealized: CountUnrealized, - ) -> Result> { - let ExecutedBlock { - block, - block_root, - state, - parent_block, - confirmed_state_roots, - payload_verification_outcome, - parent_eth1_finalization_data, - consensus_context, - } = executed_block; + ) -> Result> { + let availability = cache_fn(self.clone())?; + match availability { + Availability::Available(block) => { + let ExecutedBlock { + block, + block_root, + state, + parent_block, + parent_eth1_finalization_data, + confirmed_state_roots, + consensus_context, + payload_verification_outcome, + } = block; - let chain = self.clone(); + let available_block = match block { + BlockWrapper::Available(block) => block, + BlockWrapper::AvailabilityPending(_) => { + todo!() // logic error + } + }; - let available_block = AvailableBlock { - block: block.block_cloned(), - blobs: blobs, - }; + let slot = available_block.block.slot(); - let block_hash = self - .spawn_blocking_handle( - move || { - chain.import_block( - available_block, - block_root, - state, - confirmed_state_roots, - payload_verification_outcome.payload_verification_status, - count_unrealized, - parent_block, - parent_eth1_finalization_data, - consensus_context, + // import + let chain = self.clone(); + let result = self + .spawn_blocking_handle( + move || { + chain.import_block( + available_block, + block_root, + state, + confirmed_state_roots, + payload_verification_outcome.payload_verification_status, + count_unrealized, + parent_block, + parent_eth1_finalization_data, + consensus_context, + ) + }, + "payload_verification_handle", ) - }, - "payload_verification_handle", - ) - .await??; + .await + .map_err(|e| { + let b = BlockError::from(e); + self.handle_block_error(b) + })?; - Ok(block_hash) + match result { + // The block was successfully verified and imported. Yay. + Ok(block_root) => { + trace!( + self.log, + "Beacon block imported"; + "block_root" => ?block_root, + "block_slot" => slot, + ); + + // Increment the Prometheus counter for block processing successes. + metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); + + Ok(AvailabilityProcessingStatus::Imported(block_root)) + } + Err(e) => Err(self.handle_block_error(e)), + } + } + Availability::PendingBlock(block_root) => { + Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) + } + Availability::PendingBlobs(blob_ids) => { + Ok(AvailabilityProcessingStatus::PendingBlobs(blob_ids)) + } + } } /// Accepts a fully-verified and available block and imports it into the chain without performing any @@ -4802,12 +4838,11 @@ impl BeaconChain { .as_ref() .ok_or(BlockProductionError::TrustedSetupNotInitialized)?; let beacon_block_root = block.canonical_root(); - let expected_kzg_commitments: &KzgCommitments = - block.body().blob_kzg_commitments().map_err(|_| { - BlockProductionError::InvalidBlockVariant( - "EIP4844 block does not contain kzg commitments".to_string(), - ) - })?; + let expected_kzg_commitments = block.body().blob_kzg_commitments().map_err(|_| { + BlockProductionError::InvalidBlockVariant( + "EIP4844 block does not contain kzg commitments".to_string(), + ) + })?; if expected_kzg_commitments.len() != blobs.len() { return Err(BlockProductionError::MissingKzgCommitment(format!( @@ -4856,7 +4891,8 @@ impl BeaconChain { .collect::>, BlockProductionError>>()?, ); - self.blob_cache.put(beacon_block_root, blob_sidecars); + self.proposal_blob_cache + .put(beacon_block_root, blob_sidecars); } metrics::inc_counter(&metrics::BLOCK_PRODUCTION_SUCCESSES); @@ -4874,8 +4910,8 @@ impl BeaconChain { fn compute_blob_kzg_proofs( kzg: &Arc, - blobs: &Blobs<::EthSpec>, - expected_kzg_commitments: &KzgCommitments<::EthSpec>, + blobs: &Blobs, + expected_kzg_commitments: &KzgCommitments, slot: Slot, ) -> Result, BlockProductionError> { blobs @@ -6173,49 +6209,6 @@ impl BeaconChain { .map(|fork_epoch| fork_epoch <= current_epoch) .unwrap_or(false)) } - - pub async fn check_data_availability( - &self, - block: Arc>, - ) -> Result, Error> { - let kzg_commitments = block - .message() - .body() - .blob_kzg_commitments() - .map_err(|_| BlobError::KzgCommitmentMissing)?; - let transactions = block - .message() - .body() - .execution_payload_eip4844() - .map(|payload| payload.transactions()) - .map_err(|_| BlobError::TransactionsMissing)? - .ok_or(BlobError::TransactionsMissing)?; - - if verify_kzg_commitments_against_transactions::(transactions, kzg_commitments) - .is_err() - { - return Err(BlobError::TransactionCommitmentMismatch); - } - - // Validate that the kzg proof is valid against the commitments and blobs - let kzg = self - .kzg - .as_ref() - .ok_or(BlobError::TrustedSetupNotInitialized)?; - - if !kzg_utils::validate_blobs_sidecar( - kzg, - block_slot, - block_root, - kzg_commitments, - blob_sidecar, - ) - .map_err(BlobError::KzgError)? - { - return Err(BlobError::InvalidKzgProof); - } - Ok(()) - } } impl Drop for BeaconChain { @@ -6266,4 +6259,4 @@ impl ChainSegmentResult { ChainSegmentResult::Successful { .. } => Ok(()), } } -} \ No newline at end of file +} diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 35fb4c1429..9736e6c400 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -1,15 +1,21 @@ use slot_clock::SlotClock; +use state_processing::ConsensusContext; use std::sync::Arc; use crate::beacon_chain::{ BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, }; +use crate::gossip_blob_cache::AvailabilityCheckError; +use crate::snapshot_cache::PreProcessingSnapshot; use crate::BeaconChainError; +use derivative::Derivative; use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions; +use types::blob_sidecar::BlobSidecarArcList; use types::{ - BeaconBlockRef, BeaconStateError, BlobSidecarList, Epoch, EthSpec, Hash256, KzgCommitment, - SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, Transactions, + BeaconBlockRef, BeaconStateError, BlobSidecar, BlobSidecarList, Epoch, EthSpec, Hash256, + KzgCommitment, SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, + Transactions, }; #[derive(Debug)] @@ -107,6 +113,8 @@ pub enum BlobError { /// /// The block is invalid and the peer is faulty. UnknownValidator(u64), + + BlobCacheError(AvailabilityCheckError), } impl From for BlobError { @@ -121,14 +129,27 @@ impl From for BlobError { } } +/// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on +/// the p2p network. +#[derive(Debug)] +pub struct GossipVerifiedBlob { + blob: BlobSidecar, +} + +impl GossipVerifiedBlob { + pub fn to_blob(self) -> BlobSidecar { + self.blob + } +} + pub fn validate_blob_sidecar_for_gossip( - blob_sidecar: SignedBlobSidecar, + signed_blob_sidecar: SignedBlobSidecar, subnet: u64, chain: &BeaconChain, -) -> Result<(), BlobError> { - let blob_slot = blob_sidecar.message.slot; - let blob_index = blob_sidecar.message.index; - let block_root = blob_sidecar.message.block_root; +) -> Result, BlobError> { + let blob_slot = signed_blob_sidecar.message.slot; + let blob_index = signed_blob_sidecar.message.index; + let block_root = signed_blob_sidecar.message.block_root; // Verify that the blob_sidecar was received on the correct subnet. if blob_index != subnet { @@ -167,7 +188,7 @@ pub fn validate_blob_sidecar_for_gossip( // TODO(pawan): should we verify locally that the parent root is correct // or just use whatever the proposer gives us? - let proposer_shuffling_root = blob_sidecar.message.block_parent_root; + let proposer_shuffling_root = signed_blob_sidecar.message.block_parent_root; let (proposer_index, fork) = match chain .beacon_proposer_cache @@ -184,7 +205,7 @@ pub fn validate_blob_sidecar_for_gossip( } }; - let blob_proposer_index = blob_sidecar.message.proposer_index; + let blob_proposer_index = signed_blob_sidecar.message.proposer_index; if proposer_index != blob_proposer_index as usize { return Err(BlobError::ProposerIndexMismatch { sidecar: blob_proposer_index as usize, @@ -203,7 +224,7 @@ pub fn validate_blob_sidecar_for_gossip( .get(proposer_index as usize) .ok_or_else(|| BlobError::UnknownValidator(proposer_index as u64))?; - blob_sidecar.verify_signature( + signed_blob_sidecar.verify_signature( None, pubkey, &fork, @@ -221,8 +242,6 @@ pub fn validate_blob_sidecar_for_gossip( // TODO(pawan): Check if other blobs for the same proposer index and blob index have been // received and drop if required. - // TODO(pawan): potentially add to a seen cache at this point. - // Verify if the corresponding block for this blob has been received. // Note: this should be the last gossip check so that we can forward the blob // over the gossip network even if we haven't received the corresponding block yet @@ -233,13 +252,16 @@ pub fn validate_blob_sidecar_for_gossip( .get_block(&block_root) .or_else(|| chain.early_attester_cache.get_proto_block(block_root)); // TODO(pawan): should we be checking this cache? + // TODO(pawan): this may be redundant with the new `AvailabilityProcessingStatus::PendingBlock variant` if block_opt.is_none() { return Err(BlobError::UnknownHeadBlock { beacon_block_root: block_root, }); } - Ok(()) + Ok(GossipVerifiedBlob { + blob: signed_blob_sidecar.message, + }) } pub fn verify_data_availability( @@ -301,39 +323,45 @@ impl IntoAvailableBlock for BlockWrapper { } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Derivative)] +#[derivative(Hash(bound = "T: EthSpec"))] pub struct AvailableBlock { pub block: Arc>, - pub blobs: Blobs, + pub blobs: VerifiedBlobs, } impl AvailableBlock { - pub fn blobs(&self) -> Option>> { + pub fn blobs(&self) -> Option> { match &self.blobs { - Blobs::NotRequired | Blobs::None => None, - Blobs::Available(blobs) => Some(blobs.clone()), + VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreEip4844 => { + None + } + VerifiedBlobs::Available(blobs) => Some(blobs.clone()), } } - pub fn deconstruct(self) -> (Arc>, Option>>) { + pub fn deconstruct(self) -> (Arc>, Option>) { match self.blobs { - Blobs::NotRequired | Blobs::None => (self.block, None), - Blobs::Available(blobs) => (self.block, Some(blobs)), + VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreEip4844 => { + (self.block, None) + } + VerifiedBlobs::Available(blobs) => (self.block, Some(blobs)), } } } -#[derive(Clone, Debug, PartialEq)] -pub enum Blobs { +#[derive(Clone, Debug, PartialEq, Derivative)] +#[derivative(Hash(bound = "E: EthSpec"))] +pub enum VerifiedBlobs { /// These blobs are available. - Available(Arc>), + Available(BlobSidecarArcList), /// This block is from outside the data availability boundary so doesn't require /// a data availability check. NotRequired, /// The block's `kzg_commitments` field is empty so it does not contain any blobs. EmptyBlobs, /// This is a block prior to the 4844 fork, so doesn't require any blobs - None, + PreEip4844, } pub trait AsBlock { @@ -348,7 +376,8 @@ pub trait AsBlock { fn canonical_root(&self) -> Hash256; } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Derivative)] +#[derivative(Hash(bound = "E: EthSpec"))] pub enum BlockWrapper { /// This variant is fully available. /// i.e. for pre-4844 blocks, it contains a (`SignedBeaconBlock`, `Blobs::None`) and for diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 544f484663..7092576f0b 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -54,6 +54,7 @@ use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, AllowOptimisticImport, NotifyExecutionLayer, PayloadNotifier, }; +use crate::gossip_blob_cache::AvailabilityCheckError; use crate::snapshot_cache::PreProcessingSnapshot; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; @@ -64,6 +65,7 @@ use crate::{ }, metrics, BeaconChain, BeaconChainError, BeaconChainTypes, }; +use derivative::Derivative; use eth2::types::EventKind; use execution_layer::PayloadStatus; use fork_choice::{AttestationFromBlock, PayloadVerificationStatus}; @@ -306,6 +308,7 @@ pub enum BlockError { parent_root: Hash256, }, BlobValidation(BlobError), + AvailabilityCheck(AvailabilityCheckError), } impl From for BlockError { @@ -314,6 +317,12 @@ impl From for BlockError { } } +impl From for BlockError { + fn from(e: AvailabilityCheckError) -> Self { + Self::AvailabilityCheck(e) + } +} + /// Returned when block validation failed due to some issue verifying /// the execution payload. #[derive(Debug)] @@ -487,6 +496,7 @@ impl From for BlockError { } /// Stores information about verifying a payload against an execution engine. +#[derive(Clone)] pub struct PayloadVerificationOutcome { pub payload_verification_status: PayloadVerificationStatus, pub is_valid_merge_transition_block: bool, @@ -619,7 +629,8 @@ pub fn signature_verify_chain_segment( /// A wrapper around a `SignedBeaconBlock` that indicates it has been approved for re-gossiping on /// the p2p network. -#[derive(Debug)] +#[derive(Derivative)] +#[derivative(Debug(bound = "T: BeaconChainTypes"))] pub struct GossipVerifiedBlock { pub block: BlockWrapper, pub block_root: Hash256, @@ -663,17 +674,24 @@ pub struct ExecutionPendingBlock { pub payload_verification_handle: PayloadVerificationHandle, } -pub struct ExecutedBlock { - pub block: BlockWrapper, +#[derive(Clone)] +pub struct ExecutedBlock { + pub block: BlockWrapper, pub block_root: Hash256, - pub state: BeaconState, - pub parent_block: SignedBeaconBlock>, + pub state: BeaconState, + pub parent_block: SignedBeaconBlock>, pub parent_eth1_finalization_data: Eth1FinalizationData, pub confirmed_state_roots: Vec, - pub consensus_context: ConsensusContext, + pub consensus_context: ConsensusContext, pub payload_verification_outcome: PayloadVerificationOutcome, } +impl std::fmt::Debug for ExecutedBlock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.block) + } +} + /// Implemented on types that can be converted into a `ExecutionPendingBlock`. /// /// Used to allow functions to accept blocks at various stages of verification. @@ -1156,7 +1174,7 @@ impl IntoExecutionPendingBlock for BlockWrapper block .into_execution_pending_block_slashable(block_root, chain, notify_execution_layer), BlockWrapper::Available(AvailableBlock { block, blobs }) => { - let execution_pending_block = block.into_execution_pending_block_slashable( + let mut execution_pending_block = block.into_execution_pending_block_slashable( block_root, chain, notify_execution_layer, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 2d92ad6cb1..7da25b207f 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,9 +1,11 @@ use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY}; use crate::blob_cache::BlobCache; +use crate::block_verification::ExecutedBlock; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; use crate::eth1_finalization_cache::Eth1FinalizationCache; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; +use crate::gossip_blob_cache::DataAvailabilityChecker; use crate::head_tracker::HeadTracker; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::persisted_beacon_chain::PersistedBeaconChain; @@ -644,7 +646,8 @@ where let kzg = if let Some(trusted_setup) = self.trusted_setup { let kzg = Kzg::new_from_trusted_setup(trusted_setup) .map_err(|e| format!("Failed to load trusted setup: {:?}", e))?; - Some(Arc::new(kzg)) + let kzg_arc = Arc::new(kzg); + Some(kzg_arc) } else { None }; @@ -850,7 +853,9 @@ where graffiti: self.graffiti, slasher: self.slasher.clone(), validator_monitor: RwLock::new(validator_monitor), - blob_cache: BlobCache::default(), + //TODO(sean) should we move kzg solely to the da checker? + data_availability_checker: DataAvailabilityChecker::new(kzg.clone()), + proposal_blob_cache: BlobCache::default(), kzg, }; diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index 2df1a4f05b..2520af41a2 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -1,4 +1,4 @@ -use crate::blob_verification::{AvailabilityPendingBlock, AvailableBlock}; +use crate::blob_verification::AvailableBlock; use crate::{ attester_cache::{CommitteeLengths, Error}, metrics, @@ -6,6 +6,7 @@ use crate::{ use parking_lot::RwLock; use proto_array::Block as ProtoBlock; use std::sync::Arc; +use types::blob_sidecar::BlobSidecarArcList; use types::*; pub struct CacheItem { @@ -21,7 +22,8 @@ pub struct CacheItem { * Values used to make the block available. */ block: Arc>, - blobs: Option>>, + //TODO(sean) remove this and just use the da checker?' + blobs: Option>, proto_block: ProtoBlock, } @@ -160,7 +162,7 @@ impl EarlyAttesterCache { } /// Returns the blobs, if `block_root` matches the cached item. - pub fn get_blobs(&self, block_root: Hash256) -> Option>> { + pub fn get_blobs(&self, block_root: Hash256) -> Option> { self.item .read() .as_ref() diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index bb2d3c5b44..b561b6ea1e 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -134,9 +134,9 @@ impl PayloadNotifier { /// contains a few extra checks by running `partially_verify_execution_payload` first: /// /// https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/bellatrix/beacon-chain.md#notify_new_payload -async fn notify_new_payload( +async fn notify_new_payload<'a, T: BeaconChainTypes>( chain: &Arc>, - block: BeaconBlockRef, + block: BeaconBlockRef<'a, T::EthSpec>, ) -> Result> { let execution_payload = block.execution_payload()?; diff --git a/beacon_node/beacon_chain/src/gossip_blob_cache.rs b/beacon_node/beacon_chain/src/gossip_blob_cache.rs index 2904a2bb85..51e937b3bd 100644 --- a/beacon_node/beacon_chain/src/gossip_blob_cache.rs +++ b/beacon_node/beacon_chain/src/gossip_blob_cache.rs @@ -1,47 +1,63 @@ -use std::collections::{BTreeMap, HashMap, HashSet}; -use std::sync::Arc; +use crate::blob_verification::{ + verify_data_availability, AsBlock, AvailableBlock, BlockWrapper, VerifiedBlobs, +}; +use crate::block_verification::{ExecutedBlock, IntoExecutionPendingBlock}; +use crate::kzg_utils::validate_blob; +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, BlockError}; +use kzg::Error as KzgError; +use kzg::{Kzg, KzgCommitment}; use parking_lot::{Mutex, RwLock}; -use kzg::KzgCommitment; -use ssz_types::VariableList; +use ssz_types::{Error, VariableList}; +use std::collections::hash_map::Entry; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::future::Future; +use std::sync::{mpsc, Arc}; +use tokio::sync::mpsc::Sender; use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; -use types::{EthSpec, Hash256}; -use crate::blob_verification::{AvailabilityPendingBlock, verify_data_availability}; -use crate::block_verification::IntoExecutionPendingBlock; +use types::{EthSpec, Hash256, SignedBeaconBlock, SignedBlobSidecar}; + +#[derive(Debug)] +pub enum AvailabilityCheckError { + DuplicateBlob(Hash256), + Kzg(KzgError), + SszTypes(ssz_types::Error), +} + +impl From for AvailabilityCheckError { + fn from(value: Error) -> Self { + Self::SszTypes(value) + } +} /// This cache contains /// - blobs that have been gossip verified /// - commitments for blocks that have been gossip verified, but the commitments themselves /// have not been verified against blobs /// - blocks that have been fully verified and only require a data availability check -pub struct GossipBlobCache { - blob_cache: Mutex> +pub struct DataAvailabilityChecker { + rpc_blob_cache: RwLock>>>, + gossip_blob_cache: Mutex>>, + kzg: Option>, } -struct GossipBlobCacheInner { - // used when all blobs are not yet present and when the block is not yet present - - //TODO(sean) do we want two versions of this cache, one meant to serve RPC? - unverified_blobs: BTreeMap>>, - // used when the block was fully processed before we received all blobs - availability_pending_blocks: HashMap>, - // used to cache kzg commitments from gossip verified blocks in case we receive all blobs during block processing - unverified_commitments: HashMap>, - // used when block + blob kzg verification completes prior before block processing - verified_commitments: HashSet, +pub enum Availability { + PendingBlobs(Vec), + PendingBlock(Hash256), + Available(ExecutedBlock), } -impl GossipBlobCache { - pub fn new() -> Self { +struct GossipBlobCache { + verified_blobs: Vec>>, + executed_block: Option>, +} +impl DataAvailabilityChecker { + pub fn new(kzg: Option>) -> Self { Self { - blob_cache: Mutex::new(GossipBlobCacheInner { - unverified_blobs: BTreeMap::new(), - availability_pending_blocks: HashMap::new(), - unverified_commitments: HashMap::new(), - verified_commitments: HashSet::new(), - }) + rpc_blob_cache: <_>::default(), + gossip_blob_cache: <_>::default(), + kzg, } - } /// When we receive a blob check if we've cached it. If it completes a set and we have the @@ -49,59 +65,192 @@ impl GossipBlobCache { /// cached, verify the block and import it. /// /// This should only accept gossip verified blobs, so we should not have to worry about dupes. - pub fn put_blob(&self, blob: Arc>) { - let blob_id = blob.id(); - let blob_cache = self.blob_cache.lock(); - - if let Some(dup) = blob_cache.unverified_blobs.insert(blob_id, blob) { - // return error relating to gossip validation failure - } - - if let Some(availability_pending_block) = blob_cache.availability_pending_blocks.get(&blob.block_root) { - let num_blobs = availability_pending_block.kzg_commitments().len(); - let mut blobs : Vec> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0) - ..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect(); - - if blobs.len() == num_blobs { - // verify - // import - } - } else if let Some(commitments) = blob_cache.unverified_commitments.get(&blob.block_root) { - let num_blobs = commitments.len(); - let mut blobs : Vec> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0) - ..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect(); - - if blobs.len() == num_blobs { - // verify - // cache - } - } - } - - - pub fn put_commitments(&self, block_root: Hash256, kzg_commitments: VariableList) { - let blob_cache = self.blob_cache.lock(); - if let Some(dup) = blob_cache.unverified_commitments.insert(block_root, kzg_commitments) { - // return error relating to gossip validation failure - } - - let num_blobs = commitments.len(); - let mut blobs : Vec> = blob_cache.unverified_blobs.range(BlobIdentifier::new(blob.block_root, 0) - ..BlobIdentifier::new(blob.block_root, num_blobs as u64)).collect(); - - if blobs.len() == num_blobs { - // verify - // cache - } - } - - pub fn check_availability_and_import(&self, block_root: Hash256, block: AvailabilityPendingBlock) -> bool { - let blob_cache = self.blob_cache.lock(); - if blob_cache.verified_commitments.contains(&block_root) { - true - } else { - // cache the block + // return an enum here that may include the full block + pub fn put_blob( + &self, + blob: Arc>, + ) -> Result, AvailabilityCheckError> { + let verified = if let Some(kzg) = self.kzg.as_ref() { + validate_blob::( + kzg, + blob.blob.clone(), + blob.kzg_commitment.clone(), + blob.kzg_proof, + ) + .map_err(|e| AvailabilityCheckError::Kzg(e))? + } else { false - } + // error wrong fork + }; + + // TODO(remove clones) + + if verified { + let mut blob_cache = self.gossip_blob_cache.lock(); + + // Gossip cache. + blob_cache + .entry(blob.block_root) + .and_modify(|mut inner| { + // All blobs reaching this cache should be gossip verified and gossip verification + // should filter duplicates, as well as validate indices. + inner + .verified_blobs + .insert(blob.index as usize, blob.clone()); + + if let Some(executed_block) = inner.executed_block.take() { + let verified_commitments: Vec<_> = inner + .verified_blobs + .iter() + .map(|blob| blob.kzg_commitment) + .collect(); + if verified_commitments + == executed_block + .block + .as_block() + .message_eip4844() + .unwrap() //TODO(sean) errors + .body + .blob_kzg_commitments + .clone() + .to_vec() + { + // send to reprocessing queue ? + //TODO(sean) try_send? + //TODO(sean) errors + } else { + let _ = inner.executed_block.insert(executed_block); + } + } + }) + .or_insert(GossipBlobCache { + verified_blobs: vec![blob.clone()], + executed_block: None, + }); + + drop(blob_cache); + + // RPC cache. + self.rpc_blob_cache.write().insert(blob.id(), blob.clone()); + } + + Ok(Availability::PendingBlobs(vec![])) + } + + // return an enum here that may include the full block + pub fn check_block_availability( + &self, + executed_block: ExecutedBlock, + ) -> Result, AvailabilityCheckError> { + let block_clone = executed_block.block.clone(); + + let availability = match block_clone { + BlockWrapper::Available(available_block) => Availability::Available(executed_block), + BlockWrapper::AvailabilityPending(block) => { + if let Ok(kzg_commitments) = block.message().body().blob_kzg_commitments() { + // first check if the blockwrapper contains blobs, if so, use those + + let mut guard = self.gossip_blob_cache.lock(); + let entry = guard.entry(executed_block.block_root); + + match entry { + Entry::Occupied(mut occupied_entry) => { + let cache: &mut GossipBlobCache = occupied_entry.get_mut(); + + let verified_commitments: Vec<_> = cache + .verified_blobs + .iter() + .map(|blob| blob.kzg_commitment) + .collect(); + if verified_commitments == kzg_commitments.clone().to_vec() { + let removed: GossipBlobCache = occupied_entry.remove(); + + let ExecutedBlock { + block: _, + block_root, + state, + parent_block, + parent_eth1_finalization_data, + confirmed_state_roots, + consensus_context, + payload_verification_outcome, + } = executed_block; + + let available_block = BlockWrapper::Available(AvailableBlock { + block, + blobs: VerifiedBlobs::Available(VariableList::new( + removed.verified_blobs, + )?), + }); + + let available_executed = ExecutedBlock { + block: available_block, + block_root, + state, + parent_block, + parent_eth1_finalization_data, + confirmed_state_roots, + consensus_context, + payload_verification_outcome, + }; + Availability::Available(available_executed) + } else { + let mut missing_blobs = Vec::with_capacity(kzg_commitments.len()); + for i in 0..kzg_commitments.len() { + if cache.verified_blobs.get(i).is_none() { + missing_blobs.push(BlobIdentifier { + block_root: executed_block.block_root, + index: i as u64, + }) + } + } + + //TODO(sean) add a check that missing blobs > 0 + + let _ = cache.executed_block.insert(executed_block.clone()); + // log that we cached the block? + Availability::PendingBlobs(missing_blobs) + } + } + Entry::Vacant(vacant_entry) => { + let mut blob_ids = Vec::with_capacity(kzg_commitments.len()); + for i in 0..kzg_commitments.len() { + blob_ids.push(BlobIdentifier { + block_root: executed_block.block_root, + index: i as u64, + }); + } + + vacant_entry.insert(GossipBlobCache { + verified_blobs: vec![], + executed_block: Some(executed_block), + }); + + Availability::PendingBlobs(blob_ids) + } + } + } else { + Availability::Available(executed_block) + } + } + }; + Ok(availability) + } + + /// Adds the blob to the cache. Returns true if adding the blob completes + /// all the required blob sidecars for a given block root. + /// + /// Note: we can only know this if we know `block.kzg_commitments.len()` + pub fn put_blob_temp( + &self, + blob: Arc>, + ) -> Result { + unimplemented!() + } + + /// Returns all blobs associated with a given block root otherwise returns + /// a UnavailableBlobs error. + pub fn blobs(&self, block_root: Hash256) -> Result, AvailabilityCheckError> { + unimplemented!() } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 74b641f352..1a0247f99a 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -23,6 +23,7 @@ pub mod events; pub mod execution_payload; pub mod fork_choice_signal; pub mod fork_revert; +pub mod gossip_blob_cache; mod head_tracker; pub mod historical_blocks; pub mod kzg_utils; @@ -51,12 +52,12 @@ pub mod test_utils; mod timeout_rw_lock; pub mod validator_monitor; pub mod validator_pubkey_cache; -pub mod gossip_blob_cache; pub use self::beacon_chain::{ - AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, - CountUnrealized, ForkChoiceError, OverrideForkchoiceUpdate, ProduceBlockVerification, - StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, + AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, + BeaconStore, ChainSegmentResult, CountUnrealized, ForkChoiceError, OverrideForkchoiceUpdate, + ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped, + INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; pub use self::beacon_snapshot::BeaconSnapshot; @@ -66,7 +67,7 @@ pub use self::historical_blocks::HistoricalBlockError; pub use attestation_verification::Error as AttestationError; pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError}; pub use block_verification::{ - get_block_root, BlockError, ExecutionPayloadError, GossipVerifiedBlock, + get_block_root, BlockError, ExecutedBlock, ExecutionPayloadError, GossipVerifiedBlock, }; pub use canonical_head::{CachedHead, CanonicalHead, CanonicalHeadRwLock}; pub use eth1_chain::{Eth1Chain, Eth1ChainBackend}; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index a784c67411..df2545adf1 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1680,7 +1680,8 @@ where NotifyExecutionLayer::Yes, ) .await? - .into(); + .try_into() + .unwrap(); self.chain.recompute_head_at_current_slot().await; Ok(block_hash) } @@ -1698,7 +1699,8 @@ where NotifyExecutionLayer::Yes, ) .await? - .into(); + .try_into() + .unwrap(); self.chain.recompute_head_at_current_slot().await; Ok(block_hash) } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index e80b6fd18c..41e08bfdca 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -862,6 +862,9 @@ where .ok_or("beacon_chain requires a runtime context")? .clone(); + let (tx, mut rx) = + tokio::sync::mpsc::channel::>(1000); //TODO(sean) capacity + let chain = self .beacon_chain_builder .ok_or("beacon_chain requires a beacon_chain_builder")? diff --git a/beacon_node/execution_layer/src/engine_api/json_structures.rs b/beacon_node/execution_layer/src/engine_api/json_structures.rs index ed9a58f266..1cb6235a41 100644 --- a/beacon_node/execution_layer/src/engine_api/json_structures.rs +++ b/beacon_node/execution_layer/src/engine_api/json_structures.rs @@ -2,9 +2,10 @@ use super::*; use serde::{Deserialize, Serialize}; use strum::EnumString; use superstruct::superstruct; -use types::blobs_sidecar::KzgCommitments; +use types::beacon_block_body::KzgCommitments; +use types::blob_sidecar::Blobs; use types::{ - Blobs, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadCapella, + EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge, FixedVector, Transaction, Unsigned, VariableList, Withdrawal, }; diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index adfdcd9f6f..fd9b794175 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -41,15 +41,16 @@ use tokio::{ }; use tokio_stream::wrappers::WatchStream; use tree_hash::TreeHash; +use types::beacon_block_body::KzgCommitments; +use types::blob_sidecar::Blobs; use types::consts::eip4844::BLOB_TX_TYPE; use types::transaction::{AccessTuple, BlobTransaction, EcdsaSignature, SignedBlobTransaction}; use types::Withdrawals; +use types::{AbstractExecPayload, BeaconStateError, ExecPayload, VersionedHash}; use types::{ - blobs_sidecar::{Blobs, KzgCommitments}, BlindedPayload, BlockType, ChainSpec, Epoch, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge, ForkName, }; -use types::{AbstractExecPayload, BeaconStateError, ExecPayload, VersionedHash}; use types::{ ProposerPreparationData, PublicKeyBytes, Signature, SignedBeaconBlock, Slot, Transaction, Uint256, diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index b484f4079a..d53bd793e8 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -4,7 +4,7 @@ use eth2::types::BlockId as CoreBlockId; use std::fmt; use std::str::FromStr; use std::sync::Arc; -use types::{BlobsSidecar, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot}; +use types::{BlobSidecar, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot}; /// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given /// `BlockId`. @@ -216,13 +216,13 @@ impl BlockId { pub async fn blobs_sidecar( &self, chain: &BeaconChain, - ) -> Result>, warp::Rejection> { + ) -> Result>, warp::Rejection> { let root = self.root(chain)?.0; let Some(data_availability_boundary) = chain.data_availability_boundary() else { return Err(warp_utils::reject::custom_not_found("Eip4844 fork disabled".into())); }; - match chain.get_blobs(&root, data_availability_boundary) { - Ok(Some(blob)) => Ok(Arc::new(blob)), + match chain.get_blobs(&root) { + Ok(Some(blob)) => todo!(), // Jimmy's PR will fix this, Ok(None) => Err(warp_utils::reject::custom_not_found(format!( "Blob with block root {} is not in the store", root diff --git a/beacon_node/http_api/src/build_block_contents.rs b/beacon_node/http_api/src/build_block_contents.rs index 1908c03ea1..6512197ef6 100644 --- a/beacon_node/http_api/src/build_block_contents.rs +++ b/beacon_node/http_api/src/build_block_contents.rs @@ -16,7 +16,7 @@ pub fn build_block_contents { let block_root = &block.canonical_root(); - if let Some(blob_sidecars) = chain.blob_cache.pop(block_root) { + if let Some(blob_sidecars) = chain.proposal_blob_cache.pop(block_root) { let block_and_blobs = BeaconBlockAndBlobSidecars { block, blob_sidecars, @@ -24,9 +24,9 @@ pub fn build_block_contents( let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); metrics::observe_duration(&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, delay); - let available_block = match wrapped_block.into_available_block(block_root, &chain) { - Ok(available_block) => available_block, - Err(e) => { - let msg = format!("{:?}", e); + let available_block = match wrapped_block.clone().into_available_block() { + Some(available_block) => available_block, + None => { error!( log, - "Invalid block provided to HTTP API"; - "reason" => &msg + "Invalid block provided to HTTP API unavailable block"; //TODO(sean) probably want a real error here ); - return Err(warp_utils::reject::broadcast_without_import(msg)); + return Err(warp_utils::reject::broadcast_without_import( + "unavailable block".to_string(), + )); } }; match chain .process_block( block_root, - available_block.clone(), + wrapped_block, CountUnrealized::True, NotifyExecutionLayer::Yes, ) .await { - Ok(root) => { + Ok(AvailabilityProcessingStatus::Imported(root)) => { info!( log, "Valid block from HTTP API"; "block_delay" => ?delay, "root" => format!("{}", root), - "proposer_index" => available_block.message().proposer_index(), - "slot" => available_block.slot(), + "proposer_index" => available_block.block.message().proposer_index(), + "slot" => available_block.block.slot(), ); // Notify the validator monitor. chain.validator_monitor.read().register_api_block( seen_timestamp, - available_block.message(), + available_block.block.message(), root, &chain.slot_clock, ); @@ -123,7 +124,7 @@ pub async fn publish_block( "Block was broadcast too late"; "msg" => "system may be overloaded, block likely to be orphaned", "delay_ms" => delay.as_millis(), - "slot" => available_block.slot(), + "slot" => available_block.block.slot(), "root" => ?root, ) } else if delay >= delayed_threshold { @@ -132,19 +133,37 @@ pub async fn publish_block( "Block broadcast was delayed"; "msg" => "system may be overloaded, block may be orphaned", "delay_ms" => delay.as_millis(), - "slot" => available_block.slot(), + "slot" => available_block.block.slot(), "root" => ?root, ) } Ok(()) } + Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) => { + let msg = format!("Missing block with root {:?}", block_root); + error!( + log, + "Invalid block provided to HTTP API"; + "reason" => &msg + ); + Err(warp_utils::reject::broadcast_without_import(msg)) + } + Ok(AvailabilityProcessingStatus::PendingBlobs(blob_ids)) => { + let msg = format!("Missing blobs {:?}", blob_ids); + error!( + log, + "Invalid block provided to HTTP API"; + "reason" => &msg + ); + Err(warp_utils::reject::broadcast_without_import(msg)) + } Err(BlockError::BlockIsAlreadyKnown) => { info!( log, "Block from HTTP API already known"; "block" => ?block_root, - "slot" => available_block.slot(), + "slot" => available_block.block.slot(), ); Ok(()) } diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 7db1b22d67..00fa7faff0 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -513,12 +513,13 @@ pub async fn proposer_boost_re_org_test( let randao_reveal = harness .sign_randao_reveal(&state_b, proposer_index, slot_c) .into(); - let unsigned_block_c = tester + let unsigned_block_contents_c = tester .client .get_validator_blocks(slot_c, &randao_reveal, None) .await .unwrap() .data; + let unsigned_block_c = unsigned_block_contents_c.deconstruct().0; let block_c = harness.sign_beacon_block(unsigned_block_c, &state_b); if should_re_org { @@ -700,7 +701,9 @@ pub async fn fork_choice_before_proposal() { .get_validator_blocks::>(slot_d, &randao_reveal, None) .await .unwrap() - .data; + .data + .deconstruct() + .0; // Head is now B. assert_eq!( diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 977c737fd0..dae17006bc 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -61,8 +61,8 @@ struct ApiTester { harness: Arc>>, chain: Arc>>, client: BeaconNodeHttpClient, - next_block: SignedBeaconBlock, - reorg_block: SignedBeaconBlock, + next_block: SignedBlockContents, + reorg_block: SignedBlockContents, attestations: Vec>, contribution_and_proofs: Vec>, attester_slashing: AttesterSlashing, @@ -154,11 +154,13 @@ impl ApiTester { let (next_block, _next_state) = harness .make_block(head.beacon_state.clone(), harness.chain.slot().unwrap()) .await; + let next_block = SignedBlockContents::from(next_block); // `make_block` adds random graffiti, so this will produce an alternate block let (reorg_block, _reorg_state) = harness .make_block(head.beacon_state.clone(), harness.chain.slot().unwrap()) .await; + let reorg_block = SignedBlockContents::from(reorg_block); let head_state_root = head.beacon_state_root(); let attestations = harness @@ -288,11 +290,13 @@ impl ApiTester { let (next_block, _next_state) = harness .make_block(head.beacon_state.clone(), harness.chain.slot().unwrap()) .await; + let next_block = SignedBlockContents::from(next_block); // `make_block` adds random graffiti, so this will produce an alternate block let (reorg_block, _reorg_state) = harness .make_block(head.beacon_state.clone(), harness.chain.slot().unwrap()) .await; + let reorg_block = SignedBlockContents::from(reorg_block); let head_state_root = head.beacon_state_root(); let attestations = harness @@ -975,9 +979,9 @@ impl ApiTester { } pub async fn test_post_beacon_blocks_valid(mut self) -> Self { - let next_block = &self.next_block; + let next_block = self.next_block.clone(); - self.client.post_beacon_blocks(next_block).await.unwrap(); + self.client.post_beacon_blocks(&next_block).await.unwrap(); assert!( self.network_rx.network_recv.recv().await.is_some(), @@ -988,10 +992,14 @@ impl ApiTester { } pub async fn test_post_beacon_blocks_invalid(mut self) -> Self { - let mut next_block = self.next_block.clone(); + let mut next_block = self.next_block.clone().deconstruct().0; *next_block.message_mut().proposer_index_mut() += 1; - assert!(self.client.post_beacon_blocks(&next_block).await.is_err()); + assert!(self + .client + .post_beacon_blocks(&SignedBlockContents::from(next_block)) + .await + .is_err()); assert!( self.network_rx.network_recv.recv().await.is_some(), @@ -2065,11 +2073,17 @@ impl ApiTester { .get_validator_blocks::>(slot, &randao_reveal, None) .await .unwrap() - .data; + .data + .deconstruct() + .0; let signed_block = block.sign(&sk, &fork, genesis_validators_root, &self.chain.spec); + let signed_block_contents = SignedBlockContents::from(signed_block.clone()); - self.client.post_beacon_blocks(&signed_block).await.unwrap(); + self.client + .post_beacon_blocks(&signed_block_contents) + .await + .unwrap(); assert_eq!(self.chain.head_beacon_block().as_ref(), &signed_block); @@ -2093,7 +2107,9 @@ impl ApiTester { ) .await .unwrap() - .data; + .data + .deconstruct() + .0; assert_eq!(block.slot(), slot); self.chain.slot_clock.set_slot(slot.as_u64() + 1); } @@ -3760,12 +3776,12 @@ impl ApiTester { // Submit the next block, which is on an epoch boundary, so this will produce a finalized // checkpoint event, head event, and block event - let block_root = self.next_block.canonical_root(); + let block_root = self.next_block.signed_block().canonical_root(); // current_duty_dependent_root = block root because this is the first slot of the epoch let current_duty_dependent_root = self.chain.head_beacon_block_root(); let current_slot = self.chain.slot().unwrap(); - let next_slot = self.next_block.slot(); + let next_slot = self.next_block.signed_block().slot(); let finalization_distance = E::slots_per_epoch() * 2; let expected_block = EventKind::Block(SseBlock { @@ -3777,7 +3793,7 @@ impl ApiTester { let expected_head = EventKind::Head(SseHead { block: block_root, slot: next_slot, - state: self.next_block.state_root(), + state: self.next_block.signed_block().state_root(), current_duty_dependent_root, previous_duty_dependent_root: self .chain @@ -3826,13 +3842,17 @@ impl ApiTester { .unwrap(); let expected_reorg = EventKind::ChainReorg(SseChainReorg { - slot: self.next_block.slot(), + slot: self.next_block.signed_block().slot(), depth: 1, - old_head_block: self.next_block.canonical_root(), - old_head_state: self.next_block.state_root(), - new_head_block: self.reorg_block.canonical_root(), - new_head_state: self.reorg_block.state_root(), - epoch: self.next_block.slot().epoch(E::slots_per_epoch()), + old_head_block: self.next_block.signed_block().canonical_root(), + old_head_state: self.next_block.signed_block().state_root(), + new_head_block: self.reorg_block.signed_block().canonical_root(), + new_head_state: self.reorg_block.signed_block().state_root(), + epoch: self + .next_block + .signed_block() + .slot() + .epoch(E::slots_per_epoch()), execution_optimistic: false, }); @@ -3894,8 +3914,8 @@ impl ApiTester { .await .unwrap(); - let block_root = self.next_block.canonical_root(); - let next_slot = self.next_block.slot(); + let block_root = self.next_block.signed_block().canonical_root(); + let next_slot = self.next_block.signed_block().slot(); let expected_block = EventKind::Block(SseBlock { block: block_root, @@ -3906,7 +3926,7 @@ impl ApiTester { let expected_head = EventKind::Head(SseHead { block: block_root, slot: next_slot, - state: self.next_block.state_root(), + state: self.next_block.signed_block().state_root(), current_duty_dependent_root: self.chain.genesis_block_root, previous_duty_dependent_root: self.chain.genesis_block_root, epoch_transition: false, diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index bf7d9ca2a8..8125ea9303 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -20,10 +20,9 @@ use tokio_util::{ codec::Framed, compat::{Compat, FuturesAsyncReadCompatExt}, }; -use types::BlobsSidecar; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockCapella, BeaconBlockMerge, - EmptyBlock, EthSpec, ForkContext, ForkName, Hash256, MainnetEthSpec, Signature, + BlobSidecar, EmptyBlock, EthSpec, ForkContext, ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock, }; @@ -115,12 +114,12 @@ lazy_static! { .as_ssz_bytes() .len(); - pub static ref BLOBS_SIDECAR_MIN: usize = BlobsSidecar::::empty().as_ssz_bytes().len(); - pub static ref BLOBS_SIDECAR_MAX: usize = BlobsSidecar::::max_size(); + pub static ref BLOB_SIDECAR_MIN: usize = BlobSidecar::::empty().as_ssz_bytes().len(); + pub static ref BLOB_SIDECAR_MAX: usize = BlobSidecar::::max_size(); //FIXME(sean) these are underestimates - pub static ref SIGNED_BLOCK_AND_BLOBS_MIN: usize = *BLOBS_SIDECAR_MIN + *SIGNED_BEACON_BLOCK_BASE_MIN; - pub static ref SIGNED_BLOCK_AND_BLOBS_MAX: usize =*BLOBS_SIDECAR_MAX + *SIGNED_BEACON_BLOCK_EIP4844_MAX; + pub static ref SIGNED_BLOCK_AND_BLOBS_MIN: usize = *BLOB_SIDECAR_MIN + *SIGNED_BEACON_BLOCK_BASE_MIN; + pub static ref SIGNED_BLOCK_AND_BLOBS_MAX: usize =*BLOB_SIDECAR_MAX + *SIGNED_BEACON_BLOCK_EIP4844_MAX; } /// The maximum bytes that can be sent across the RPC pre-merge. @@ -385,7 +384,7 @@ impl ProtocolId { Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), - Protocol::BlobsByRange => RpcLimits::new(*BLOBS_SIDECAR_MIN, *BLOBS_SIDECAR_MAX), + Protocol::BlobsByRange => RpcLimits::new(*BLOB_SIDECAR_MIN, *BLOB_SIDECAR_MAX), Protocol::BlobsByRoot => { // TODO: wrong too RpcLimits::new(*SIGNED_BLOCK_AND_BLOBS_MIN, *SIGNED_BLOCK_AND_BLOBS_MAX) diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 95d8a294c1..2b7f17f4b7 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -41,7 +41,7 @@ num_cpus = "1.13.0" lru_cache = { path = "../../common/lru_cache" } if-addrs = "0.6.4" strum = "0.24.0" -tokio-util = { version = "0.6.3", features = ["time"] } +tokio-util = { version = "0.7.7", features = ["time"] } derivative = "2.2.0" delay_map = "0.1.1" ethereum-types = { version = "0.14.1", optional = true } diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 410389b119..240bf24285 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -449,7 +449,7 @@ impl WorkEvent { peer_id: PeerId, peer_client: Client, blob_index: u64, - signed_blob: Arc>, + signed_blob: SignedBlobSidecar, seen_timestamp: Duration, ) -> Self { Self { @@ -729,7 +729,7 @@ impl WorkEvent { impl std::convert::From> for WorkEvent { fn from(ready_work: ReadyWork) -> Self { match ready_work { - ReadyWork::Block(QueuedGossipBlock { + ReadyWork::GossipBlock(QueuedGossipBlock { peer_id, block, seen_timestamp, @@ -864,7 +864,7 @@ pub enum Work { peer_id: PeerId, peer_client: Client, blob_index: u64, - signed_blob: Arc>, + signed_blob: SignedBlobSidecar, seen_timestamp: Duration, }, DelayedImportBlock { @@ -1760,6 +1760,7 @@ impl BeaconProcessor { peer_client, blob_index, signed_blob, + work_reprocessing_tx, seen_timestamp, ) .await diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 4d0bdc0027..e344a61132 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -20,7 +20,7 @@ use futures::task::Poll; use futures::{Stream, StreamExt}; use lighthouse_network::{MessageId, PeerId}; use logging::TimeLatch; -use slog::{crit, debug, error, trace, warn, Logger}; +use slog::{debug, error, trace, warn, Logger}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::pin::Pin; @@ -28,7 +28,6 @@ use std::task::Context; use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::time::error::Error as TimeError; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; use types::{ Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, SubnetId, @@ -87,7 +86,7 @@ pub enum ReprocessQueueMessage { /// Events sent by the scheduler once they are ready for re-processing. pub enum ReadyWork { - Block(QueuedGossipBlock), + GossipBlock(QueuedGossipBlock), RpcBlock(QueuedRpcBlock), Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), @@ -154,8 +153,6 @@ enum InboundEvent { ReadyAttestation(QueuedAttestationId), /// A light client update that is ready for re-processing. ReadyLightClientUpdate(QueuedLightClientUpdateId), - /// A `DelayQueue` returned an error. - DelayQueueError(TimeError, &'static str), /// A message sent to the `ReprocessQueue` Msg(ReprocessQueueMessage), } @@ -233,54 +230,42 @@ impl Stream for ReprocessQueue { // The sequential nature of blockchains means it is generally better to try and import all // existing blocks before new ones. match self.gossip_block_delay_queue.poll_expired(cx) { - Poll::Ready(Some(Ok(queued_block))) => { + Poll::Ready(Some(queued_block)) => { return Poll::Ready(Some(InboundEvent::ReadyGossipBlock( queued_block.into_inner(), ))); } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "gossip_block_queue"))); - } // `Poll::Ready(None)` means that there are no more entries in the delay queue and we // will continue to get this result until something else is added into the queue. Poll::Ready(None) | Poll::Pending => (), } match self.rpc_block_delay_queue.poll_expired(cx) { - Poll::Ready(Some(Ok(queued_block))) => { + Poll::Ready(Some(queued_block)) => { return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner()))); } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "rpc_block_queue"))); - } // `Poll::Ready(None)` means that there are no more entries in the delay queue and we // will continue to get this result until something else is added into the queue. Poll::Ready(None) | Poll::Pending => (), } match self.attestations_delay_queue.poll_expired(cx) { - Poll::Ready(Some(Ok(attestation_id))) => { + Poll::Ready(Some(attestation_id)) => { return Poll::Ready(Some(InboundEvent::ReadyAttestation( attestation_id.into_inner(), ))); } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "attestations_queue"))); - } // `Poll::Ready(None)` means that there are no more entries in the delay queue and we // will continue to get this result until something else is added into the queue. Poll::Ready(None) | Poll::Pending => (), } match self.lc_updates_delay_queue.poll_expired(cx) { - Poll::Ready(Some(Ok(lc_id))) => { + Poll::Ready(Some(lc_id)) => { return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate( lc_id.into_inner(), ))); } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue"))); - } // `Poll::Ready(None)` means that there are no more entries in the delay queue and we // will continue to get this result until something else is added into the queue. Poll::Ready(None) | Poll::Pending => (), @@ -400,7 +385,7 @@ impl ReprocessQueue { if block_slot <= now && self .ready_work_tx - .try_send(ReadyWork::Block(early_block)) + .try_send(ReadyWork::GossipBlock(early_block)) .is_err() { error!( @@ -680,7 +665,7 @@ impl ReprocessQueue { if self .ready_work_tx - .try_send(ReadyWork::Block(ready_block)) + .try_send(ReadyWork::GossipBlock(ready_block)) .is_err() { error!( @@ -689,14 +674,7 @@ impl ReprocessQueue { ); } } - InboundEvent::DelayQueueError(e, queue_name) => { - crit!( - log, - "Failed to poll queue"; - "queue" => queue_name, - "e" => ?e - ) - } + InboundEvent::ReadyAttestation(queued_id) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 4d7c949dc0..f60843ab9b 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1,7 +1,8 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; +use beacon_chain::blob_verification::{AsBlock, BlockWrapper, GossipVerifiedBlob}; use beacon_chain::store::Error; +use beacon_chain::ExecutedBlock; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, light_client_finality_update_verification::Error as LightClientFinalityUpdateError, @@ -9,15 +10,14 @@ use beacon_chain::{ observed_operations::ObservationOutcome, sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::get_block_delay_ms, - BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized, ForkChoiceError, - GossipVerifiedBlock, NotifyExecutionLayer, + AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized, + ForkChoiceError, GossipVerifiedBlock, NotifyExecutionLayer, }; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use operation_pool::ReceivedPreCapella; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use ssz::Encode; -use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; @@ -652,21 +652,66 @@ impl Worker { #[allow(clippy::too_many_arguments)] pub async fn process_gossip_blob( self, - message_id: MessageId, + _message_id: MessageId, peer_id: PeerId, peer_client: Client, blob_index: u64, - signed_blob: Arc>, - seen_duration: Duration, + signed_blob: SignedBlobSidecar, + reprocess_tx: mpsc::Sender>, + _seen_duration: Duration, ) { - // TODO: gossip verification - crit!(self.log, "UNIMPLEMENTED gossip blob verification"; - "peer_id" => %peer_id, - "client" => %peer_client, - "blob_topic" => blob_index, - "blob_index" => signed_blob.message.index, - "blob_slot" => signed_blob.message.slot - ); + match self + .chain + .verify_blob_sidecar_for_gossip(signed_blob, blob_index) + { + Ok(gossip_verified_blob) => { + self.process_gossip_verified_blob( + peer_id, + gossip_verified_blob, + reprocess_tx, + _seen_duration, + ) + .await + } + Err(_) => { + // TODO(pawan): handle all blob errors for peer scoring + todo!() + } + } + } + + pub async fn process_gossip_verified_blob( + self, + peer_id: PeerId, + verified_blob: GossipVerifiedBlob, + reprocess_tx: mpsc::Sender>, + // This value is not used presently, but it might come in handy for debugging. + _seen_duration: Duration, + ) { + // TODO + match self + .chain + .process_blob(verified_blob.to_blob(), CountUnrealized::True) + .await + { + Ok(AvailabilityProcessingStatus::Imported(hash)) => { + todo!() + // add to metrics + // logging + } + Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => self + .send_sync_message(SyncMessage::UnknownBlobHash { + peer_id, + pending_blobs, + }), + Ok(AvailabilityProcessingStatus::PendingBlock(block_hash)) => { + self.send_sync_message(SyncMessage::UnknownBlockHash(peer_id, block_hash)); + } + Err(e) => { + // handle errors + todo!() + } + } } /// Process the beacon block received from the gossip network and: @@ -802,6 +847,9 @@ impl Worker { verified_block } + Err(BlockError::AvailabilityCheck(e)) => { + todo!() + } Err(BlockError::ParentUnknown(block)) => { debug!( self.log, @@ -964,28 +1012,30 @@ impl Worker { /// Process the beacon block that has already passed gossip verification. /// /// Raises a log if there are errors. - pub async fn process_gossip_verified_block( + pub async fn process_execution_verified_block( self, peer_id: PeerId, - verified_block: GossipVerifiedBlock, + executed_block: ExecutedBlock, reprocess_tx: mpsc::Sender>, // This value is not used presently, but it might come in handy for debugging. - _seen_duration: Duration, + seen_duration: Duration, ) { - let block = verified_block.block.block_cloned(); - let block_root = verified_block.block_root; + let block_root = executed_block.block_root; + let block = executed_block.block.block_cloned(); match self .chain - .process_block( - block_root, - verified_block, + .check_availability_and_maybe_import( + |chain| { + chain + .data_availability_checker + .check_block_availability(executed_block) + }, CountUnrealized::True, - NotifyExecutionLayer::Yes, ) .await { - Ok(BlockProcessingResult::Verified(block_root)) => { + Ok(AvailabilityProcessingStatus::Imported(block_root)) => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); if reprocess_tx @@ -1012,8 +1062,119 @@ impl Worker { self.chain.recompute_head_at_current_slot().await; } - Ok(BlockProcessingResult::AvailabilityPending(executed_block)) => { - // cache in blob cache and make rpc request for blob + Ok(AvailabilityProcessingStatus::PendingBlobs(_)) + | Ok(AvailabilityProcessingStatus::PendingBlock(_)) + | Err(BlockError::AvailabilityCheck(_)) => { + // TODO(need to do something different if it's unavailble again) + unimplemented!() + } + Err(BlockError::ParentUnknown(block)) => { + // Inform the sync manager to find parents for this block + // This should not occur. It should be checked by `should_forward_block` + error!( + self.log, + "Block with unknown parent attempted to be processed"; + "peer_id" => %peer_id + ); + self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block, block_root)); + } + Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => { + debug!( + self.log, + "Failed to verify execution payload"; + "error" => %e + ); + } + other => { + debug!( + self.log, + "Invalid gossip beacon block"; + "outcome" => ?other, + "block root" => ?block_root, + "block slot" => block.slot() + ); + self.gossip_penalize_peer( + peer_id, + PeerAction::MidToleranceError, + "bad_gossip_block_ssz", + ); + trace!( + self.log, + "Invalid gossip beacon block ssz"; + "ssz" => format_args!("0x{}", hex::encode(block.as_ssz_bytes())), + ); + } + }; + } + + /// Process the beacon block that has already passed gossip verification. + /// + /// Raises a log if there are errors. + pub async fn process_gossip_verified_block( + self, + peer_id: PeerId, + verified_block: GossipVerifiedBlock, + reprocess_tx: mpsc::Sender>, + // This value is not used presently, but it might come in handy for debugging. + seen_duration: Duration, + ) { + let block = verified_block.block.block_cloned(); + let block_root = verified_block.block_root; + + match self + .chain + .process_block( + block_root, + verified_block, + CountUnrealized::True, + NotifyExecutionLayer::Yes, + ) + .await + { + Ok(AvailabilityProcessingStatus::Imported(block_root)) => { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); + + if reprocess_tx + .try_send(ReprocessQueueMessage::BlockImported { + block_root, + parent_root: block.message().parent_root(), + }) + .is_err() + { + error!( + self.log, + "Failed to inform block import"; + "source" => "gossip", + "block_root" => ?block_root, + ) + }; + + debug!( + self.log, + "Gossipsub block processed"; + "block" => ?block_root, + "peer_id" => %peer_id + ); + + self.chain.recompute_head_at_current_slot().await; + } + Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) => { + // This error variant doesn't make any sense in this context + crit!( + self.log, + "Internal error. Cannot get AvailabilityProcessingStatus::PendingBlock on processing block"; + "block_root" => %block_root + ); + } + Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => { + // make rpc request for blob + self.send_sync_message(SyncMessage::UnknownBlobHash { + peer_id, + pending_blobs, + }); + } + Err(BlockError::AvailabilityCheck(_)) => { + todo!() } Err(BlockError::ParentUnknown(block)) => { // Inform the sync manager to find parents for this block diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 4480f37130..a42b56794a 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -233,36 +233,37 @@ impl Worker { .get_block_and_blobs_checking_early_attester_cache(&root) .await { - Ok(Some(block_and_blobs)) => { - // - // TODO: HORRIBLE NSFW CODE AHEAD - // - let types::SignedBeaconBlockAndBlobsSidecar {beacon_block, blobs_sidecar} = block_and_blobs; - let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs_sidecar.as_ref().clone(); - // TODO: this should be unreachable after this is addressed seriously, - // so for now let's be ok with a panic in the expect. - let block = beacon_block.message_eip4844().expect("We fucked up the block blob stuff"); - // Intentionally not accessing the list directly - for (known_index, blob) in blob_bundle.into_iter().enumerate() { - if (known_index as u64) == index { - let blob_sidecar = types::BlobSidecar{ - block_root: beacon_block_root, - index, - slot: beacon_block_slot, - block_parent_root: block.parent_root, - proposer_index: block.proposer_index, - blob, - kzg_commitment: block.body.blob_kzg_commitments[known_index].clone(), // TODO: needs to be stored in a more logical way so that this won't panic. - kzg_proof: kzg_aggregated_proof // TODO: yeah - }; - self.send_response( - peer_id, - Response::BlobsByRoot(Some(Arc::new(blob_sidecar))), - request_id, - ); - send_block_count += 1; - } - } + Ok(Some(())) => { + todo!(); + // // + // // TODO: HORRIBLE NSFW CODE AHEAD + // // + // let types::SignedBeaconBlockAndBlobsSidecar {beacon_block, blobs_sidecar} = block_and_blobs; + // let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs_sidecar.as_ref().clone(); + // // TODO: this should be unreachable after this is addressed seriously, + // // so for now let's be ok with a panic in the expect. + // let block = beacon_block.message_eip4844().expect("We fucked up the block blob stuff"); + // // Intentionally not accessing the list directly + // for (known_index, blob) in blob_bundle.into_iter().enumerate() { + // if (known_index as u64) == index { + // let blob_sidecar = types::BlobSidecar{ + // block_root: beacon_block_root, + // index, + // slot: beacon_block_slot, + // block_parent_root: block.parent_root, + // proposer_index: block.proposer_index, + // blob, + // kzg_commitment: block.body.blob_kzg_commitments[known_index], // TODO: needs to be stored in a more logical way so that this won't panic. + // kzg_proof: kzg_aggregated_proof // TODO: yeah + // }; + // self.send_response( + // peer_id, + // Response::BlobsByRoot(Some(Arc::new(blob_sidecar))), + // request_id, + // ); + // send_block_count += 1; + // } + // } } Ok(None) => { debug!( @@ -836,35 +837,36 @@ impl Worker { let mut send_response = true; for root in block_roots { - match self.chain.get_blobs(&root, data_availability_boundary) { + match self.chain.get_blobs(&root) { Ok(Some(blobs)) => { - // TODO: more GROSS code ahead. Reader beware - let types::BlobsSidecar { - beacon_block_root, - beacon_block_slot, - blobs: blob_bundle, - kzg_aggregated_proof, - }: types::BlobsSidecar<_> = blobs; - - for (blob_index, blob) in blob_bundle.into_iter().enumerate() { - let blob_sidecar = types::BlobSidecar { - block_root: beacon_block_root, - index: blob_index as u64, - slot: beacon_block_slot, - block_parent_root: Hash256::zero(), - proposer_index: 0, - blob, - kzg_commitment: types::KzgCommitment::default(), - kzg_proof: types::KzgProof::default(), - }; - - blobs_sent += 1; - self.send_network_message(NetworkMessage::SendResponse { - peer_id, - response: Response::BlobsByRange(Some(Arc::new(blob_sidecar))), - id: request_id, - }); - } + todo!(); + // // TODO: more GROSS code ahead. Reader beware + // let types::BlobsSidecar { + // beacon_block_root, + // beacon_block_slot, + // blobs: blob_bundle, + // kzg_aggregated_proof: _, + // }: types::BlobsSidecar<_> = blobs; + // + // for (blob_index, blob) in blob_bundle.into_iter().enumerate() { + // let blob_sidecar = types::BlobSidecar { + // block_root: beacon_block_root, + // index: blob_index as u64, + // slot: beacon_block_slot, + // block_parent_root: Hash256::zero(), + // proposer_index: 0, + // blob, + // kzg_commitment: types::KzgCommitment::default(), + // kzg_proof: types::KzgProof::default(), + // }; + // + // blobs_sent += 1; + // self.send_network_message(NetworkMessage::SendResponse { + // peer_id, + // response: Response::BlobsByRange(Some(Arc::new(blob_sidecar))), + // id: request_id, + // }); + // } } Ok(None) => { error!( diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 093e69729b..08c8200bc8 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -8,7 +8,7 @@ use crate::metrics; use crate::sync::manager::{BlockProcessType, SyncMessage}; use crate::sync::{BatchProcessResult, ChainId}; use beacon_chain::blob_verification::{AsBlock, BlockWrapper, IntoAvailableBlock}; -use beacon_chain::CountUnrealized; +use beacon_chain::{AvailabilityProcessingStatus, CountUnrealized}; use beacon_chain::{ BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, @@ -86,32 +86,22 @@ impl Worker { }; let slot = block.slot(); let parent_root = block.message().parent_root(); - let available_block = block - .into_available_block(block_root, &self.chain) - .map_err(BlockError::BlobValidation); - let result = match available_block { - Ok(BlockProcessingResult::Verified(block)) => { - self.chain - .process_block( - block_root, - block, - CountUnrealized::True, - NotifyExecutionLayer::Yes, - ) - .await - } - Ok(BlockProcessingResult::AvailabilityPending(executed_block)) => { - // Shouldn't happen as sync should only send blocks for processing - // after sending blocks into the availability cache. - } - Err(e) => Err(e), - }; + let result = self + .chain + .process_block( + block_root, + block, + CountUnrealized::True, + NotifyExecutionLayer::Yes, + ) + .await; metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); // RPC block imported, regardless of process type - if let &Ok(hash) = &result { + //TODO(sean) handle pending availability variants + if let &Ok(AvailabilityProcessingStatus::Imported(hash)) = &result { info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash); // Trigger processing for work referencing this block. diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index a18ce4e7c8..067d84044f 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -257,7 +257,7 @@ impl Router { peer_id, self.network_globals.client(&peer_id), blob_index, - Arc::new(signed_blob), + signed_blob, ); } PubsubMessage::VoluntaryExit(exit) => { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 56a5f24586..281d0ef616 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -258,7 +258,7 @@ impl Processor { ); if let RequestId::Sync(id) = request_id { - self.send_to_sync(SyncMessage::RpcBlobs { + self.send_to_sync(SyncMessage::RpcBlob { peer_id, request_id: id, blob_sidecar, @@ -330,7 +330,7 @@ impl Processor { "Received BlobsByRoot Response"; "peer" => %peer_id, ); - self.send_to_sync(SyncMessage::RpcBlobs { + self.send_to_sync(SyncMessage::RpcBlob { request_id, peer_id, blob_sidecar, @@ -365,7 +365,7 @@ impl Processor { peer_id: PeerId, peer_client: Client, blob_index: u64, // TODO: add a type for the blob index - signed_blob: Arc>, + signed_blob: SignedBlobSidecar, ) { self.send_beacon_processor_work(BeaconWorkEvent::gossip_signed_blob_sidecar( message_id, diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 438317d1cd..67db9a7a32 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -32,7 +32,7 @@ impl BlocksAndBlobsRequestInfo { pub fn into_responses(self) -> Result>, &'static str> { let BlocksAndBlobsRequestInfo { accumulated_blocks, - mut accumulated_sidecars, + accumulated_sidecars, .. } = self; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 353d3e896e..ad7cad3216 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,7 +35,7 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; -use super::network_context::{BlockOrBlobs, SyncNetworkContext}; +use super::network_context::{BlockOrBlob, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; @@ -56,6 +56,7 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; +use types::blob_sidecar::BlobIdentifier; use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -86,6 +87,10 @@ pub enum RequestId { RangeBlobs { id: Id }, } +// TODO(diva) I'm updating functions what at a time, but this should be revisited because I think +// some code paths that are split for blobs and blocks can be made just one after sync as a whole +// is updated. + #[derive(Debug)] /// A message that can be sent to the sync manager thread. pub enum SyncMessage { @@ -101,7 +106,7 @@ pub enum SyncMessage { }, /// A blob has been received from the RPC. - RpcBlobs { + RpcBlob { request_id: RequestId, peer_id: PeerId, blob_sidecar: Option>>, @@ -115,6 +120,13 @@ pub enum SyncMessage { /// manager to attempt to find the block matching the unknown hash. UnknownBlockHash(PeerId, Hash256), + /// A peer has sent us a block that we haven't received all the blobs for. This triggers + /// the manager to attempt to find the pending blobs for the given block root. + UnknownBlobHash { + peer_id: PeerId, + pending_blobs: Vec, + }, + /// A peer has disconnected. Disconnect(PeerId), @@ -554,7 +566,12 @@ impl SyncManager { beacon_block, seen_timestamp, } => { - self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp); + self.rpc_block_or_blob_received( + request_id, + peer_id, + beacon_block.into(), + seen_timestamp, + ); } SyncMessage::UnknownBlock(peer_id, block, block_root) => { // If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore @@ -589,6 +606,9 @@ impl SyncManager { .search_block(block_hash, peer_id, &mut self.network); } } + SyncMessage::UnknownBlobHash { .. } => { + unimplemented!() + } SyncMessage::Disconnect(peer_id) => { self.peer_disconnect(&peer_id); } @@ -638,12 +658,17 @@ impl SyncManager { .block_lookups .parent_chain_processed(chain_hash, result, &mut self.network), }, - SyncMessage::RpcBlobs { + SyncMessage::RpcBlob { request_id, peer_id, blob_sidecar, seen_timestamp, - } => self.rpc_blobs_received(request_id, peer_id, blob_sidecar, seen_timestamp), + } => self.rpc_block_or_blob_received( + request_id, + peer_id, + blob_sidecar.into(), + seen_timestamp, + ), } } @@ -702,30 +727,50 @@ impl SyncManager { } } - fn rpc_block_received( + fn rpc_block_or_blob_received( &mut self, request_id: RequestId, peer_id: PeerId, - beacon_block: Option>>, + block_or_blob: BlockOrBlob, seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( - id, - peer_id, - beacon_block.map(|block| block.into()), - seen_timestamp, - &mut self.network, - ), - RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( - id, - peer_id, - beacon_block.map(|block| block.into()), - seen_timestamp, - &mut self.network, - ), + RequestId::SingleBlock { id } => { + // TODO(diva) adjust when dealing with by root requests. This code is here to + // satisfy dead code analysis + match block_or_blob { + BlockOrBlob::Block(maybe_block) => { + self.block_lookups.single_block_lookup_response( + id, + peer_id, + maybe_block.map(BlockWrapper::AvailabilityPending), + seen_timestamp, + &mut self.network, + ) + } + BlockOrBlob::Sidecar(_) => unimplemented!("Mimatch between BlockWrapper and what the network receives needs to be handled first."), + } + } + RequestId::ParentLookup { id } => { + // TODO(diva) adjust when dealing with by root requests. This code is here to + // satisfy dead code analysis + match block_or_blob { + BlockOrBlob::Block(maybe_block) => self.block_lookups.parent_lookup_response( + id, + peer_id, + maybe_block.map(BlockWrapper::AvailabilityPending), + seen_timestamp, + &mut self.network, + ), + BlockOrBlob::Sidecar(_) => unimplemented!("Mimatch between BlockWrapper and what the network receives needs to be handled first."), + } + } RequestId::BackFillBlocks { id } => { - let is_stream_terminator = beacon_block.is_none(); + let maybe_block = match block_or_blob { + BlockOrBlob::Block(maybe_block) => maybe_block, + BlockOrBlob::Sidecar(_) => todo!("I think this is unreachable"), + }; + let is_stream_terminator = maybe_block.is_none(); if let Some(batch_id) = self .network .backfill_sync_only_blocks_response(id, is_stream_terminator) @@ -735,7 +780,7 @@ impl SyncManager { batch_id, &peer_id, id, - beacon_block.map(|block| block.into()), + maybe_block.map(|block| block.into()), ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), Ok(ProcessResult::Successful) => {} @@ -748,7 +793,11 @@ impl SyncManager { } } RequestId::RangeBlocks { id } => { - let is_stream_terminator = beacon_block.is_none(); + let maybe_block = match block_or_blob { + BlockOrBlob::Block(maybe_block) => maybe_block, + BlockOrBlob::Sidecar(_) => todo!("I think this should be unreachable, since this is a range only-blocks request, and the network should not accept this chunk at all. Needs better handling"), + }; + let is_stream_terminator = maybe_block.is_none(); if let Some((chain_id, batch_id)) = self .network .range_sync_block_response(id, is_stream_terminator) @@ -759,28 +808,28 @@ impl SyncManager { chain_id, batch_id, id, - beacon_block.map(|block| block.into()), + maybe_block.map(|block| block.into()), ); self.update_sync_state(); } } RequestId::BackFillBlobs { id } => { - self.blobs_backfill_response(id, peer_id, beacon_block.into()) + self.backfill_block_and_blobs_response(id, peer_id, block_or_blob) } RequestId::RangeBlobs { id } => { - self.blobs_range_response(id, peer_id, beacon_block.into()) + self.range_block_and_blobs_response(id, peer_id, block_or_blob) } } } /// Handles receiving a response for a range sync request that should have both blocks and /// blobs. - fn blobs_range_response( + fn range_block_and_blobs_response( &mut self, id: Id, peer_id: PeerId, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) { if let Some((chain_id, resp)) = self .network @@ -822,11 +871,11 @@ impl SyncManager { /// Handles receiving a response for a Backfill sync request that should have both blocks and /// blobs. - fn blobs_backfill_response( + fn backfill_block_and_blobs_response( &mut self, id: Id, peer_id: PeerId, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) { if let Some(resp) = self .network @@ -871,32 +920,6 @@ impl SyncManager { } } } - - fn rpc_blobs_received( - &mut self, - request_id: RequestId, - peer_id: PeerId, - maybe_blob: Option::EthSpec>>>, - _seen_timestamp: Duration, - ) { - match request_id { - RequestId::SingleBlock { .. } | RequestId::ParentLookup { .. } => { - unreachable!("There is no such thing as a singular 'by root' glob request that is not accompanied by the block") - } - RequestId::BackFillBlocks { .. } => { - unreachable!("An only blocks request does not receive sidecars") - } - RequestId::BackFillBlobs { .. } => { - unimplemented!("Adjust backfill sync"); - } - RequestId::RangeBlocks { .. } => { - unreachable!("Only-blocks range requests don't receive sidecars") - } - RequestId::RangeBlobs { id } => { - unimplemented!("Adjust range"); - } - } - } } impl From>> for BlockProcessResult { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 10f7f32955..974d8dbd8c 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -75,20 +75,20 @@ pub struct SyncNetworkContext { } /// Small enumeration to make dealing with block and blob requests easier. -pub enum BlockOrBlobs { +pub enum BlockOrBlob { Block(Option>>), - Blobs(Option>>), + Sidecar(Option>>), } -impl From>>> for BlockOrBlobs { +impl From>>> for BlockOrBlob { fn from(block: Option>>) -> Self { - BlockOrBlobs::Block(block) + BlockOrBlob::Block(block) } } -impl From>>> for BlockOrBlobs { +impl From>>> for BlockOrBlob { fn from(blob: Option>>) -> Self { - BlockOrBlobs::Blobs(blob) + BlockOrBlob::Sidecar(blob) } } @@ -311,15 +311,15 @@ impl SyncNetworkContext { pub fn range_sync_block_and_blob_response( &mut self, request_id: Id, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) -> Option<(ChainId, BlocksAndBlobsByRangeResponse)> { match self.range_blocks_and_blobs_requests.entry(request_id) { Entry::Occupied(mut entry) => { let req = entry.get_mut(); let info = &mut req.block_blob_info; match block_or_blob { - BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), + BlockOrBlob::Sidecar(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything @@ -402,14 +402,14 @@ impl SyncNetworkContext { pub fn backfill_sync_block_and_blob_response( &mut self, request_id: Id, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) -> Option> { match self.backfill_blocks_and_blobs_requests.entry(request_id) { Entry::Occupied(mut entry) => { let (_, info) = entry.get_mut(); match block_or_blob { - BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), + BlockOrBlob::Sidecar(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 60e2f77595..c833c2d345 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -38,6 +38,7 @@ use std::marker::PhantomData; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; +use types::blob_sidecar::BlobSidecarArcList; use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; use types::*; @@ -66,7 +67,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// The hot database also contains all blocks. pub hot_db: Hot, /// LRU cache of deserialized blobs. Updated whenever a blob is loaded. - blob_cache: Mutex>>, + blob_cache: Mutex>>, /// LRU cache of deserialized blocks. Updated whenever a block is loaded. block_cache: Mutex>>, /// Chain spec. @@ -547,7 +548,7 @@ impl, Cold: ItemStore> HotColdDB /// Check if the blobs sidecar for a block exists on disk. pub fn blobs_sidecar_exists(&self, block_root: &Hash256) -> Result { - self.get_item::>(block_root) + self.get_item::>(block_root) .map(|blobs| blobs.is_some()) } @@ -568,7 +569,11 @@ impl, Cold: ItemStore> HotColdDB blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes()) } - pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar) -> Result<(), Error> { + pub fn put_blobs( + &self, + block_root: &Hash256, + blobs: BlobSidecarArcList, + ) -> Result<(), Error> { let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); blobs_db.put_bytes( DBColumn::BeaconBlob.into(), @@ -582,7 +587,7 @@ impl, Cold: ItemStore> HotColdDB pub fn blobs_as_kv_store_ops( &self, key: &Hash256, - blobs: &BlobsSidecar, + blobs: &BlobSidecarArcList, ops: &mut Vec, ) { let db_key = get_key_for_col(DBColumn::BeaconBlob.into(), key.as_bytes()); @@ -925,8 +930,8 @@ impl, Cold: ItemStore> HotColdDB for op in blob_cache_ops.iter_mut() { let reverse_op = match op { StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root), - StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() { - Some(blobs) => StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs)), + StoreOp::DeleteBlobs(block_root) => match blobs_to_delete.pop() { + Some(blobs) => StoreOp::PutBlobs(*block_root, blobs), None => return Err(HotColdDBError::Rollback.into()), }, _ => return Err(HotColdDBError::Rollback.into()), @@ -972,7 +977,7 @@ impl, Cold: ItemStore> HotColdDB for op in blob_cache_ops { match op { StoreOp::PutBlobs(block_root, blobs) => { - guard_blob.put(block_root, (*blobs).clone()); + guard_blob.put(block_root, blobs.clone()); } StoreOp::DeleteBlobs(block_root) => { @@ -1320,12 +1325,12 @@ impl, Cold: ItemStore> HotColdDB } /// Fetch a blobs sidecar from the store. - pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { + pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { Some(ref blobs_bytes) => { - let blobs = BlobsSidecar::from_ssz_bytes(blobs_bytes)?; + let blobs = BlobSidecarArcList::from_ssz_bytes(blobs_bytes)?; // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, // may want to attempt to use one again self.blob_cache.lock().put(*block_root, blobs.clone()); diff --git a/beacon_node/store/src/impls/execution_payload.rs b/beacon_node/store/src/impls/execution_payload.rs index 01a2dba0b0..f4e6c1ea66 100644 --- a/beacon_node/store/src/impls/execution_payload.rs +++ b/beacon_node/store/src/impls/execution_payload.rs @@ -1,7 +1,7 @@ use crate::{DBColumn, Error, StoreItem}; use ssz::{Decode, Encode}; use types::{ - BlobsSidecar, EthSpec, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, + BlobSidecarList, EthSpec, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge, }; @@ -25,7 +25,7 @@ macro_rules! impl_store_item { impl_store_item!(ExecutionPayloadMerge); impl_store_item!(ExecutionPayloadCapella); impl_store_item!(ExecutionPayloadEip4844); -impl_store_item!(BlobsSidecar); +impl_store_item!(BlobSidecarList); /// This fork-agnostic implementation should be only used for writing. /// diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 3056c29292..bcda5b97bf 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -43,6 +43,7 @@ pub use metrics::scrape_for_metrics; use parking_lot::MutexGuard; use std::sync::Arc; use strum::{EnumString, IntoStaticStr}; +use types::blob_sidecar::BlobSidecarArcList; pub use types::*; pub type ColumnIter<'a> = Box), Error>> + 'a>; @@ -159,7 +160,7 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), - PutBlobs(Hash256, Arc>), + PutBlobs(Hash256, BlobSidecarArcList), PutOrphanedBlobsKey(Hash256), PutStateSummary(Hash256, HotStateSummary), PutStateTemporaryFlag(Hash256), diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 2a27d31da9..fec41b8bca 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -704,7 +704,7 @@ impl BeaconNodeHttpClient { pub async fn get_blobs_sidecar( &self, block_id: BlockId, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let path = self.get_blobs_sidecar_path(block_id)?; let response = match self.get_response(path, |b| b).await.optional()? { Some(res) => res, diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index d328639120..bc27ddb474 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -5,6 +5,7 @@ use crate::Error as ServerError; use lighthouse_network::{ConnectionDirection, Enr, Multiaddr, PeerConnectionStatus}; use mime::{Mime, APPLICATION, JSON, OCTET_STREAM, STAR}; use serde::{Deserialize, Serialize}; +use ssz_derive::Encode; use std::cmp::Reverse; use std::convert::TryFrom; use std::fmt; @@ -1322,3 +1323,76 @@ impl> Into> } } } + +pub type BlockContentsTuple = ( + SignedBeaconBlock, + Option>, +); + +/// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobSidecars`]. +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(untagged)] +#[serde(bound = "T: EthSpec")] +pub enum SignedBlockContents = FullPayload> { + BlockAndBlobSidecars(SignedBeaconBlockAndBlobSidecars), + Block(SignedBeaconBlock), +} + +impl> SignedBlockContents { + pub fn signed_block(&self) -> &SignedBeaconBlock { + match self { + SignedBlockContents::BlockAndBlobSidecars(block_and_sidecars) => { + &block_and_sidecars.signed_block + } + SignedBlockContents::Block(block) => block, + } + } + + pub fn deconstruct(self) -> BlockContentsTuple { + match self { + SignedBlockContents::BlockAndBlobSidecars(block_and_sidecars) => ( + block_and_sidecars.signed_block, + Some(block_and_sidecars.signed_blob_sidecars), + ), + SignedBlockContents::Block(block) => (block, None), + } + } +} + +impl> From> + for SignedBlockContents +{ + fn from(block: SignedBeaconBlock) -> Self { + match block { + SignedBeaconBlock::Base(_) + | SignedBeaconBlock::Altair(_) + | SignedBeaconBlock::Merge(_) + | SignedBeaconBlock::Capella(_) => SignedBlockContents::Block(block), + //TODO: error handling, this should be try from + SignedBeaconBlock::Eip4844(_block) => todo!(), + } + } +} + +impl> From> + for SignedBlockContents +{ + fn from(block_contents_tuple: BlockContentsTuple) -> Self { + match block_contents_tuple { + (signed_block, None) => SignedBlockContents::Block(signed_block), + (signed_block, Some(signed_blob_sidecars)) => { + SignedBlockContents::BlockAndBlobSidecars(SignedBeaconBlockAndBlobSidecars { + signed_block, + signed_blob_sidecars, + }) + } + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Encode)] +#[serde(bound = "T: EthSpec")] +pub struct SignedBeaconBlockAndBlobSidecars> { + pub signed_block: SignedBeaconBlock, + pub signed_blob_sidecars: SignedBlobSidecarList, +} diff --git a/consensus/state_processing/src/consensus_context.rs b/consensus/state_processing/src/consensus_context.rs index 4c8966f92c..37bd5fe446 100644 --- a/consensus/state_processing/src/consensus_context.rs +++ b/consensus/state_processing/src/consensus_context.rs @@ -7,7 +7,7 @@ use types::{ ChainSpec, Epoch, EthSpec, Hash256, IndexedAttestation, SignedBeaconBlock, Slot, }; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ConsensusContext { /// Slot to act as an identifier/safeguard slot: Slot, diff --git a/consensus/types/src/beacon_block_body.rs b/consensus/types/src/beacon_block_body.rs index c717396522..e49f633459 100644 --- a/consensus/types/src/beacon_block_body.rs +++ b/consensus/types/src/beacon_block_body.rs @@ -1,5 +1,5 @@ +use crate::test_utils::TestRandom; use crate::*; -use crate::{blobs_sidecar::KzgCommitments, test_utils::TestRandom}; use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; @@ -9,6 +9,8 @@ use superstruct::superstruct; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; +pub type KzgCommitments = VariableList::MaxBlobsPerBlock>; + /// The body of a `BeaconChain` block, containing operations. /// /// This *superstruct* abstracts over the hard-fork. diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index 27523d588d..fbd0aebb12 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -1,16 +1,18 @@ use crate::test_utils::TestRandom; use crate::{Blob, EthSpec, Hash256, SignedRoot, Slot}; +use bls::Signature; use derivative::Derivative; use kzg::{KzgCommitment, KzgProof}; use serde_derive::{Deserialize, Serialize}; use ssz::Encode; use ssz_derive::{Decode, Encode}; use ssz_types::VariableList; +use std::sync::Arc; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; /// Container of the data that identifies an individual blob. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] +#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq, Hash)] pub struct BlobIdentifier { pub block_root: Hash256, pub index: u64, @@ -34,7 +36,6 @@ pub struct BlobIdentifier { #[derivative(PartialEq, Hash(bound = "T: EthSpec"))] pub struct BlobSidecar { pub block_root: Hash256, - // TODO: fix the type, should fit in u8 as well #[serde(with = "eth2_serde_utils::quoted_u64")] pub index: u64, pub slot: Slot, @@ -48,10 +49,21 @@ pub struct BlobSidecar { } pub type BlobSidecarList = VariableList, ::MaxBlobsPerBlock>; +//TODO(sean) is there any other way around this? need it arc blobs for caching in multiple places +pub type BlobSidecarArcList = + VariableList>, ::MaxBlobsPerBlock>; +pub type Blobs = VariableList, ::MaxExtraDataBytes>; impl SignedRoot for BlobSidecar {} impl BlobSidecar { + pub fn id(&self) -> BlobIdentifier { + BlobIdentifier { + block_root: self.block_root, + index: self.index, + } + } + pub fn empty() -> Self { Self::default() } @@ -61,4 +73,4 @@ impl BlobSidecar { // Fixed part Self::empty().as_ssz_bytes().len() } -} \ No newline at end of file +} diff --git a/consensus/types/src/blobs_sidecar.rs b/consensus/types/src/blobs_sidecar.rs deleted file mode 100644 index e2560fb30b..0000000000 --- a/consensus/types/src/blobs_sidecar.rs +++ /dev/null @@ -1,62 +0,0 @@ -use crate::test_utils::TestRandom; -use crate::{Blob, EthSpec, Hash256, KzgCommitment, SignedRoot, Slot}; -use derivative::Derivative; -use kzg::KzgProof; -use serde_derive::{Deserialize, Serialize}; -use ssz::Encode; -use ssz_derive::{Decode, Encode}; -use ssz_types::VariableList; -use test_random_derive::TestRandom; -use tree_hash_derive::TreeHash; - -pub type KzgCommitments = VariableList::MaxBlobsPerBlock>; -pub type Blobs = VariableList, ::MaxBlobsPerBlock>; - -#[derive( - Debug, - Clone, - Serialize, - Deserialize, - Encode, - Decode, - TreeHash, - Default, - TestRandom, - Derivative, - arbitrary::Arbitrary, -)] -#[serde(bound = "T: EthSpec")] -#[arbitrary(bound = "T: EthSpec")] -#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] -pub struct BlobsSidecar { - pub beacon_block_root: Hash256, - pub beacon_block_slot: Slot, - #[serde(with = "ssz_types::serde_utils::list_of_hex_fixed_vec")] - pub blobs: Blobs, - pub kzg_aggregated_proof: KzgProof, -} - -impl SignedRoot for BlobsSidecar {} - -impl BlobsSidecar { - pub fn empty() -> Self { - Self::default() - } - - pub fn empty_from_parts(beacon_block_root: Hash256, beacon_block_slot: Slot) -> Self { - Self { - beacon_block_root, - beacon_block_slot, - blobs: VariableList::empty(), - kzg_aggregated_proof: KzgProof::empty(), - } - } - - #[allow(clippy::integer_arithmetic)] - pub fn max_size() -> usize { - // Fixed part - Self::empty().as_ssz_bytes().len() - // Max size of variable length `blobs` field - + (T::max_blobs_per_block() * as Encode>::ssz_fixed_len()) - } -} diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 1f947c9e7b..c107f790c5 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -14,7 +14,7 @@ pub enum Domain { BlsToExecutionChange, BeaconProposer, BeaconAttester, - BlobsSideCar, + BlobSidecar, Randao, Deposit, VoluntaryExit, @@ -100,7 +100,7 @@ pub struct ChainSpec { */ pub(crate) domain_beacon_proposer: u32, pub(crate) domain_beacon_attester: u32, - pub(crate) domain_blobs_sidecar: u32, + pub(crate) domain_blob_sidecar: u32, pub(crate) domain_randao: u32, pub(crate) domain_deposit: u32, pub(crate) domain_voluntary_exit: u32, @@ -366,7 +366,7 @@ impl ChainSpec { match domain { Domain::BeaconProposer => self.domain_beacon_proposer, Domain::BeaconAttester => self.domain_beacon_attester, - Domain::BlobsSideCar => self.domain_blobs_sidecar, + Domain::BlobSidecar => self.domain_blob_sidecar, Domain::Randao => self.domain_randao, Domain::Deposit => self.domain_deposit, Domain::VoluntaryExit => self.domain_voluntary_exit, @@ -574,7 +574,7 @@ impl ChainSpec { domain_voluntary_exit: 4, domain_selection_proof: 5, domain_aggregate_and_proof: 6, - domain_blobs_sidecar: 10, // 0x0a000000 + domain_blob_sidecar: 11, // 0x0B000000 /* * Fork choice @@ -809,7 +809,7 @@ impl ChainSpec { domain_voluntary_exit: 4, domain_selection_proof: 5, domain_aggregate_and_proof: 6, - domain_blobs_sidecar: 10, + domain_blob_sidecar: 11, /* * Fork choice @@ -1285,7 +1285,7 @@ mod tests { test_domain(Domain::BeaconProposer, spec.domain_beacon_proposer, &spec); test_domain(Domain::BeaconAttester, spec.domain_beacon_attester, &spec); - test_domain(Domain::BlobsSideCar, spec.domain_blobs_sidecar, &spec); + test_domain(Domain::BlobSidecar, spec.domain_blob_sidecar, &spec); test_domain(Domain::Randao, spec.domain_randao, &spec); test_domain(Domain::Deposit, spec.domain_deposit, &spec); test_domain(Domain::VoluntaryExit, spec.domain_voluntary_exit, &spec); @@ -1311,7 +1311,7 @@ mod tests { &spec, ); - test_domain(Domain::BlobsSideCar, spec.domain_blobs_sidecar, &spec); + test_domain(Domain::BlobSidecar, spec.domain_blob_sidecar, &spec); } fn apply_bit_mask(domain_bytes: [u8; 4], spec: &ChainSpec) -> u32 { diff --git a/consensus/types/src/config_and_preset.rs b/consensus/types/src/config_and_preset.rs index ac93818b9c..957376c3d6 100644 --- a/consensus/types/src/config_and_preset.rs +++ b/consensus/types/src/config_and_preset.rs @@ -78,7 +78,7 @@ pub fn get_extra_fields(spec: &ChainSpec) -> HashMap { "bls_withdrawal_prefix".to_uppercase() => u8_hex(spec.bls_withdrawal_prefix_byte), "domain_beacon_proposer".to_uppercase() => u32_hex(spec.domain_beacon_proposer), "domain_beacon_attester".to_uppercase() => u32_hex(spec.domain_beacon_attester), - "domain_blobs_sidecar".to_uppercase() => u32_hex(spec.domain_blobs_sidecar), + "domain_blob_sidecar".to_uppercase() => u32_hex(spec.domain_blob_sidecar), "domain_randao".to_uppercase()=> u32_hex(spec.domain_randao), "domain_deposit".to_uppercase()=> u32_hex(spec.domain_deposit), "domain_voluntary_exit".to_uppercase() => u32_hex(spec.domain_voluntary_exit), diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index e0db5419bb..c866442605 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -99,10 +99,7 @@ pub mod sqlite; pub mod beacon_block_and_blob_sidecars; pub mod blob_sidecar; -pub mod blobs_sidecar; pub mod signed_blob; -pub mod signed_block_and_blobs; -pub mod signed_block_contents; pub mod transaction; use ethereum_types::{H160, H256}; @@ -125,7 +122,6 @@ pub use crate::beacon_block_header::BeaconBlockHeader; pub use crate::beacon_committee::{BeaconCommittee, OwnedBeaconCommittee}; pub use crate::beacon_state::{BeaconTreeHashCache, Error as BeaconStateError, *}; pub use crate::blob_sidecar::{BlobSidecar, BlobSidecarList}; -pub use crate::blobs_sidecar::{Blobs, BlobsSidecar}; pub use crate::bls_to_execution_change::BlsToExecutionChange; pub use crate::chain_spec::{ChainSpec, Config, Domain}; pub use crate::checkpoint::Checkpoint; @@ -184,11 +180,6 @@ pub use crate::signed_beacon_block::{ }; pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader; pub use crate::signed_blob::*; -pub use crate::signed_block_and_blobs::{ - SignedBeaconBlockAndBlobSidecars, SignedBeaconBlockAndBlobsSidecar, - SignedBeaconBlockAndBlobsSidecarDecode, -}; -pub use crate::signed_block_contents::SignedBlockContents; pub use crate::signed_bls_to_execution_change::SignedBlsToExecutionChange; pub use crate::signed_contribution_and_proof::SignedContributionAndProof; pub use crate::signed_voluntary_exit::SignedVoluntaryExit; @@ -228,4 +219,4 @@ pub use bls::{ pub use kzg::{KzgCommitment, KzgProof}; pub use ssz_types::{typenum, typenum::Unsigned, BitList, BitVector, FixedVector, VariableList}; -pub use superstruct::superstruct; \ No newline at end of file +pub use superstruct::superstruct; diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index 00aad61ff4..52800f0782 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -250,28 +250,6 @@ impl> SignedBeaconBlock pub fn canonical_root(&self) -> Hash256 { self.message().tree_hash_root() } - - /// Reconstructs an empty `BlobsSidecar`, using the given block root if provided, else calculates it. - /// If this block has kzg commitments, an error will be returned. If this block is from prior to the - /// Eip4844 fork, this will error. - pub fn reconstruct_empty_blobs( - &self, - block_root_opt: Option, - ) -> Result, BlobReconstructionError> { - let kzg_commitments = self - .message() - .body() - .blob_kzg_commitments() - .map_err(|_| BlobReconstructionError::InconsistentFork)?; - if kzg_commitments.is_empty() { - Ok(BlobsSidecar::empty_from_parts( - block_root_opt.unwrap_or_else(|| self.canonical_root()), - self.slot(), - )) - } else { - Err(BlobReconstructionError::UnavailableBlobs) - } - } } // We can convert pre-Bellatrix blocks without payloads into blocks with payloads. @@ -581,4 +559,4 @@ mod test { assert_eq!(reconstructed, block); } } -} \ No newline at end of file +} diff --git a/consensus/types/src/signed_blob.rs b/consensus/types/src/signed_blob.rs index c295f6c6c1..6b2279ce89 100644 --- a/consensus/types/src/signed_blob.rs +++ b/consensus/types/src/signed_blob.rs @@ -1,11 +1,14 @@ use crate::{ - test_utils::TestRandom, BlobSidecar, ChainSpec, EthSpec, Fork, Hash256, PublicKey, Signature, - SignedRoot, + test_utils::TestRandom, BlobSidecar, ChainSpec, Domain, EthSpec, Fork, Hash256, Signature, + SignedRoot, SigningData, }; +use bls::PublicKey; use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; +use ssz_types::VariableList; use test_random_derive::TestRandom; +use tree_hash::TreeHash; use tree_hash_derive::TreeHash; #[derive( @@ -29,18 +32,39 @@ pub struct SignedBlobSidecar { pub signature: Signature, } -impl SignedRoot for SignedBlobSidecar {} +pub type SignedBlobSidecarList = + VariableList, ::MaxBlobsPerBlock>; impl SignedBlobSidecar { + /// Verify `self.signature`. + /// + /// If the root of `block.message` is already known it can be passed in via `object_root_opt`. + /// Otherwise, it will be computed locally. pub fn verify_signature( &self, - _object_root_opt: Option, - _pubkey: &PublicKey, - _fork: &Fork, - _genesis_validators_root: Hash256, - _spec: &ChainSpec, + object_root_opt: Option, + pubkey: &PublicKey, + fork: &Fork, + genesis_validators_root: Hash256, + spec: &ChainSpec, ) -> bool { - // TODO (pawan): fill up logic - unimplemented!() + let domain = spec.get_domain( + self.message.slot.epoch(T::slots_per_epoch()), + Domain::BlobSidecar, + fork, + genesis_validators_root, + ); + + let message = if let Some(object_root) = object_root_opt { + SigningData { + object_root, + domain, + } + .tree_hash_root() + } else { + self.message.signing_root(domain) + }; + + self.signature.verify(pubkey, message) } } diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs deleted file mode 100644 index c6d154ef0f..0000000000 --- a/consensus/types/src/signed_block_and_blobs.rs +++ /dev/null @@ -1,47 +0,0 @@ -use crate::{ - AbstractExecPayload, BlobsSidecar, EthSpec, SignedBeaconBlock, SignedBeaconBlockEip4844, - SignedBlobSidecar, -}; -use derivative::Derivative; -use serde_derive::{Deserialize, Serialize}; -use ssz::{Decode, DecodeError}; -use ssz_derive::{Decode, Encode}; -use ssz_types::VariableList; -use std::sync::Arc; -use tree_hash_derive::TreeHash; - -#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq)] -#[serde(bound = "T: EthSpec")] -pub struct SignedBeaconBlockAndBlobsSidecarDecode { - pub beacon_block: SignedBeaconBlockEip4844, - pub blobs_sidecar: BlobsSidecar, -} - -// TODO: will be removed once we decouple blobs in Gossip -#[derive(Debug, Clone, Serialize, Deserialize, Encode, TreeHash, Derivative)] -#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] -pub struct SignedBeaconBlockAndBlobsSidecar { - pub beacon_block: Arc>, - pub blobs_sidecar: Arc>, -} - -impl SignedBeaconBlockAndBlobsSidecar { - pub fn from_ssz_bytes(bytes: &[u8]) -> Result { - let SignedBeaconBlockAndBlobsSidecarDecode { - beacon_block, - blobs_sidecar, - } = SignedBeaconBlockAndBlobsSidecarDecode::from_ssz_bytes(bytes)?; - Ok(SignedBeaconBlockAndBlobsSidecar { - beacon_block: Arc::new(SignedBeaconBlock::Eip4844(beacon_block)), - blobs_sidecar: Arc::new(blobs_sidecar), - }) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, Encode, TreeHash, Derivative)] -#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] -#[serde(bound = "T: EthSpec")] -pub struct SignedBeaconBlockAndBlobSidecars> { - pub signed_block: SignedBeaconBlock, - pub signed_blob_sidecars: VariableList, ::MaxBlobsPerBlock>, -} diff --git a/consensus/types/src/signed_block_contents.rs b/consensus/types/src/signed_block_contents.rs deleted file mode 100644 index bce6233338..0000000000 --- a/consensus/types/src/signed_block_contents.rs +++ /dev/null @@ -1,49 +0,0 @@ -use crate::signed_block_and_blobs::SignedBeaconBlockAndBlobSidecars; -use crate::{AbstractExecPayload, EthSpec, FullPayload, SignedBeaconBlock, SignedBlobSidecar}; -use derivative::Derivative; -use serde_derive::{Deserialize, Serialize}; -use ssz_types::VariableList; - -/// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobSidecars`]. -#[derive(Clone, Debug, Derivative, Serialize, Deserialize)] -#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] -#[serde(untagged)] -#[serde(bound = "T: EthSpec")] -pub enum SignedBlockContents = FullPayload> { - BlockAndBlobSidecars(SignedBeaconBlockAndBlobSidecars), - Block(SignedBeaconBlock), -} - -impl> SignedBlockContents { - pub fn signed_block(&self) -> &SignedBeaconBlock { - match self { - SignedBlockContents::BlockAndBlobSidecars(block_and_sidecars) => { - &block_and_sidecars.signed_block - } - SignedBlockContents::Block(block) => block, - } - } - - pub fn deconstruct( - self, - ) -> ( - SignedBeaconBlock, - Option, ::MaxBlobsPerBlock>>, - ) { - match self { - SignedBlockContents::BlockAndBlobSidecars(block_and_sidecars) => ( - block_and_sidecars.signed_block, - Some(block_and_sidecars.signed_blob_sidecars), - ), - SignedBlockContents::Block(block) => (block, None), - } - } -} - -impl> From> - for SignedBlockContents -{ - fn from(block: SignedBeaconBlock) -> Self { - SignedBlockContents::Block(block) - } -} diff --git a/lcli/src/parse_ssz.rs b/lcli/src/parse_ssz.rs index 0e87e330b1..70f34e09e3 100644 --- a/lcli/src/parse_ssz.rs +++ b/lcli/src/parse_ssz.rs @@ -63,7 +63,7 @@ pub fn run_parse_ssz(matches: &ArgMatches) -> Result<(), String> { "state_merge" => decode_and_print::>(&bytes, format)?, "state_capella" => decode_and_print::>(&bytes, format)?, "state_eip4844" => decode_and_print::>(&bytes, format)?, - "blobs_sidecar" => decode_and_print::>(&bytes, format)?, + "blob_sidecar" => decode_and_print::>(&bytes, format)?, other => return Err(format!("Unknown type: {}", other)), }; diff --git a/testing/ef_tests/src/type_name.rs b/testing/ef_tests/src/type_name.rs index 19b3535fbf..d94dfef485 100644 --- a/testing/ef_tests/src/type_name.rs +++ b/testing/ef_tests/src/type_name.rs @@ -50,7 +50,7 @@ type_name_generic!(BeaconBlockBodyCapella, "BeaconBlockBody"); type_name_generic!(BeaconBlockBodyEip4844, "BeaconBlockBody"); type_name!(BeaconBlockHeader); type_name_generic!(BeaconState); -type_name_generic!(BlobsSidecar); +type_name_generic!(BlobSidecar); type_name!(Checkpoint); type_name_generic!(ContributionAndProof); type_name!(Deposit); @@ -88,9 +88,5 @@ type_name!(Validator); type_name!(VoluntaryExit); type_name!(Withdrawal); type_name!(BlsToExecutionChange, "BLSToExecutionChange"); -type_name_generic!( - SignedBeaconBlockAndBlobsSidecarDecode, - "SignedBeaconBlockAndBlobsSidecar" -); type_name!(SignedBlsToExecutionChange, "SignedBLSToExecutionChange"); type_name!(HistoricalSummary); diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index eb40dee9b3..5fa32d3f42 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -6,8 +6,11 @@ use crate::{ OfflineOnFailure, }; use crate::{http_metrics::metrics, validator_store::ValidatorStore}; +use bls::SignatureBytes; use environment::RuntimeContext; -use slog::{crit, debug, error, info, trace, warn}; +use eth2::types::{BlockContents, SignedBlockContents}; +use eth2::BeaconNodeHttpClient; +use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; @@ -15,8 +18,8 @@ use std::time::Duration; use tokio::sync::mpsc; use tokio::time::sleep; use types::{ - AbstractExecPayload, BeaconBlock, BlindedPayload, BlockType, EthSpec, FullPayload, Graffiti, - PublicKeyBytes, SignedBlockContents, Slot, + AbstractExecPayload, BlindedPayload, BlockType, EthSpec, FullPayload, Graffiti, PublicKeyBytes, + Slot, }; #[derive(Debug)] @@ -341,81 +344,46 @@ impl BlockService { "slot" => slot.as_u64(), ); // Request block from first responsive beacon node. - let block = self + let block_contents = self .beacon_nodes .first_success( RequireSynced::No, OfflineOnFailure::Yes, - |beacon_node| async move { - let block: BeaconBlock = match Payload::block_type() { - BlockType::Full => { - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_GET], - ); - beacon_node - .get_validator_blocks::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })? - .data - .into() - } - BlockType::Blinded => { - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BLINDED_BEACON_BLOCK_HTTP_GET], - ); - beacon_node - .get_validator_blinded_blocks::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })? - .data - .into() - } - }; - - info!( + move |beacon_node| { + Self::get_validator_block( + beacon_node, + slot, + randao_reveal_ref, + graffiti, + proposer_index, log, - "Received unsigned block"; - "slot" => slot.as_u64(), - ); - if proposer_index != Some(block.proposer_index()) { - return Err(BlockError::Recoverable( - "Proposer index does not match block proposer. Beacon chain re-orged" - .to_string(), - )); - } - - Ok::<_, BlockError>(block) + ) }, ) .await?; + let (block, maybe_blob_sidecars) = block_contents.deconstruct(); let signing_timer = metrics::start_timer(&metrics::BLOCK_SIGNING_TIMES); - let signed_block_contents: SignedBlockContents = self_ref + + let signed_block = self_ref .validator_store .sign_block::(*validator_pubkey_ref, block, current_slot) .await - .map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))? - .into(); + .map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?; + + let maybe_signed_blobs = match maybe_blob_sidecars { + Some(blob_sidecars) => Some( + self_ref + .validator_store + .sign_blobs(*validator_pubkey_ref, blob_sidecars) + .await + .map_err(|e| { + BlockError::Recoverable(format!("Unable to sign blob: {:?}", e)) + })?, + ), + None => None, + }; + let signing_time_ms = Duration::from_secs_f64(signing_timer.map_or(0.0, |t| t.stop_and_record())).as_millis(); @@ -426,46 +394,19 @@ impl BlockService { "signing_time_ms" => signing_time_ms, ); + let signed_block_contents = SignedBlockContents::from((signed_block, maybe_signed_blobs)); + // Publish block with first available beacon node. self.beacon_nodes .first_success( RequireSynced::No, OfflineOnFailure::Yes, |beacon_node| async { - match Payload::block_type() { - BlockType::Full => { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_POST], - ); - beacon_node - .post_beacon_blocks(&signed_block_contents) - .await - .map_err(|e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing block: {:?}", - e - )) - })? - } - BlockType::Blinded => { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BLINDED_BEACON_BLOCK_HTTP_POST], - ); - beacon_node - // TODO: need to be adjusted for blobs - .post_beacon_blinded_blocks(&signed_block_contents.signed_block()) - .await - .map_err(|e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing block: {:?}", - e - )) - })? - } - } - Ok::<_, BlockError>(()) + Self::publish_signed_block_contents::( + &signed_block_contents, + beacon_node, + ) + .await }, ) .await?; @@ -482,4 +423,106 @@ impl BlockService { Ok(()) } + + async fn publish_signed_block_contents>( + signed_block_contents: &SignedBlockContents, + beacon_node: &BeaconNodeHttpClient, + ) -> Result<(), BlockError> { + match Payload::block_type() { + BlockType::Full => { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_POST], + ); + beacon_node + .post_beacon_blocks(signed_block_contents) + .await + .map_err(|e| { + BlockError::Irrecoverable(format!( + "Error from beacon node when publishing block: {:?}", + e + )) + })? + } + BlockType::Blinded => { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BLINDED_BEACON_BLOCK_HTTP_POST], + ); + todo!("need to be adjusted for blobs"); + // beacon_node + // .post_beacon_blinded_blocks(signed_block_contents.signed_block()) + // .await + // .map_err(|e| { + // BlockError::Irrecoverable(format!( + // "Error from beacon node when publishing block: {:?}", + // e + // )) + // })? + } + } + Ok::<_, BlockError>(()) + } + + async fn get_validator_block>( + beacon_node: &BeaconNodeHttpClient, + slot: Slot, + randao_reveal_ref: &SignatureBytes, + graffiti: Option, + proposer_index: Option, + log: &Logger, + ) -> Result, BlockError> { + let block_contents: BlockContents = match Payload::block_type() { + BlockType::Full => { + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_GET], + ); + beacon_node + .get_validator_blocks::(slot, randao_reveal_ref, graffiti.as_ref()) + .await + .map_err(|e| { + BlockError::Recoverable(format!( + "Error from beacon node when producing block: {:?}", + e + )) + })? + .data + } + BlockType::Blinded => { + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BLINDED_BEACON_BLOCK_HTTP_GET], + ); + todo!("implement blinded flow for blobs"); + // beacon_node + // .get_validator_blinded_blocks::( + // slot, + // randao_reveal_ref, + // graffiti.as_ref(), + // ) + // .await + // .map_err(|e| { + // BlockError::Recoverable(format!( + // "Error from beacon node when producing block: {:?}", + // e + // )) + // })? + // .data + } + }; + + info!( + log, + "Received unsigned block"; + "slot" => slot.as_u64(), + ); + if proposer_index != Some(block_contents.block().proposer_index()) { + return Err(BlockError::Recoverable( + "Proposer index does not match block proposer. Beacon chain re-orged".to_string(), + )); + } + + Ok::<_, BlockError>(block_contents) + } } diff --git a/validator_client/src/signing_method.rs b/validator_client/src/signing_method.rs index ae9df08096..e428bffcff 100644 --- a/validator_client/src/signing_method.rs +++ b/validator_client/src/signing_method.rs @@ -37,6 +37,7 @@ pub enum Error { pub enum SignableMessage<'a, T: EthSpec, Payload: AbstractExecPayload = FullPayload> { RandaoReveal(Epoch), BeaconBlock(&'a BeaconBlock), + BlobSidecar(&'a BlobSidecar), AttestationData(&'a AttestationData), SignedAggregateAndProof(&'a AggregateAndProof), SelectionProof(Slot), @@ -58,6 +59,7 @@ impl<'a, T: EthSpec, Payload: AbstractExecPayload> SignableMessage<'a, T, Pay match self { SignableMessage::RandaoReveal(epoch) => epoch.signing_root(domain), SignableMessage::BeaconBlock(b) => b.signing_root(domain), + SignableMessage::BlobSidecar(b) => b.signing_root(domain), SignableMessage::AttestationData(a) => a.signing_root(domain), SignableMessage::SignedAggregateAndProof(a) => a.signing_root(domain), SignableMessage::SelectionProof(slot) => slot.signing_root(domain), @@ -180,6 +182,10 @@ impl SigningMethod { Web3SignerObject::RandaoReveal { epoch } } SignableMessage::BeaconBlock(block) => Web3SignerObject::beacon_block(block)?, + SignableMessage::BlobSidecar(_) => { + // https://github.com/ConsenSys/web3signer/issues/726 + unimplemented!("Web3Signer blob signing not implemented.") + } SignableMessage::AttestationData(a) => Web3SignerObject::Attestation(a), SignableMessage::SignedAggregateAndProof(a) => { Web3SignerObject::AggregateAndProof(a) diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 36a0d05734..294689e3c1 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -6,6 +6,7 @@ use crate::{ Config, }; use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString}; +use eth2::types::VariableList; use parking_lot::{Mutex, RwLock}; use slashing_protection::{ interchange::Interchange, InterchangeError, NotSafe, Safe, SlashingDatabase, @@ -19,11 +20,12 @@ use std::sync::Arc; use task_executor::TaskExecutor; use types::{ attestation::Error as AttestationError, graffiti::GraffitiString, AbstractExecPayload, Address, - AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, - Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes, SelectionProof, - Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedRoot, - SignedValidatorRegistrationData, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, - SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, BlobSidecarList, ChainSpec, + ContributionAndProof, Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes, + SelectionProof, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobSidecar, + SignedBlobSidecarList, SignedContributionAndProof, SignedRoot, SignedValidatorRegistrationData, + Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, SyncCommitteeMessage, + SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, }; use validator_dir::ValidatorDir; @@ -531,6 +533,39 @@ impl ValidatorStore { } } + pub async fn sign_blobs( + &self, + validator_pubkey: PublicKeyBytes, + blob_sidecars: BlobSidecarList, + ) -> Result, Error> { + let mut signed_blob_sidecars = Vec::new(); + + for blob_sidecar in blob_sidecars.into_iter() { + let slot = blob_sidecar.slot; + let signing_epoch = slot.epoch(E::slots_per_epoch()); + let signing_context = self.signing_context(Domain::BlobSidecar, signing_epoch); + let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + + let signature = signing_method + .get_signature::>( + SignableMessage::BlobSidecar(&blob_sidecar), + signing_context, + &self.spec, + &self.task_executor, + ) + .await?; + + metrics::inc_counter_vec(&metrics::SIGNED_BLOBS_TOTAL, &[metrics::SUCCESS]); + + signed_blob_sidecars.push(SignedBlobSidecar { + message: blob_sidecar, + signature, + }); + } + + Ok(VariableList::from(signed_blob_sidecars)) + } + pub async fn sign_attestation( &self, validator_pubkey: PublicKeyBytes,