From be4d70eeffc3e7e4542480445a10480558836d34 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Thu, 16 Mar 2023 22:25:04 -0400 Subject: [PATCH] tons of changes, just tryna compile --- beacon_node/beacon_chain/src/beacon_chain.rs | 170 ++++++++---------- .../beacon_chain/src/blob_verification.rs | 28 +-- .../beacon_chain/src/block_verification.rs | 19 +- beacon_node/beacon_chain/src/builder.rs | 28 ++- .../beacon_chain/src/execution_payload.rs | 4 +- .../beacon_chain/src/gossip_blob_cache.rs | 77 +++++--- beacon_node/beacon_chain/src/lib.rs | 2 +- beacon_node/client/src/builder.rs | 9 +- beacon_node/http_api/src/block_id.rs | 8 +- .../http_api/src/build_block_contents.rs | 2 +- beacon_node/http_api/src/publish_blocks.rs | 28 +-- .../beacon_processor/worker/gossip_methods.rs | 10 +- .../beacon_processor/worker/rpc_methods.rs | 118 ++++++------ .../beacon_processor/worker/sync_methods.rs | 29 +-- .../state_processing/src/consensus_context.rs | 2 +- lcli/src/parse_ssz.rs | 2 +- testing/ef_tests/src/type_name.rs | 6 +- 17 files changed, 292 insertions(+), 250 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 482de67043..5039724649 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -8,7 +8,7 @@ use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::blob_cache::BlobCache; use crate::blob_verification::{ - AsBlock, AvailableBlock, BlobError, Blobs, BlockWrapper, IntoAvailableBlock, + AsBlock, AvailableBlock, BlobError, BlockWrapper, IntoAvailableBlock, VerifiedBlobs, }; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::{ @@ -25,6 +25,7 @@ use crate::eth1_finalization_cache::{Eth1FinalizationCache, Eth1FinalizationData use crate::events::ServerSentEventHandler; use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle}; use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; +use crate::gossip_blob_cache::DataAvailabilityChecker; use crate::head_tracker::HeadTracker; use crate::historical_blocks::HistoricalBlockError; use crate::kzg_utils; @@ -76,6 +77,7 @@ use futures::channel::mpsc::Sender; use itertools::process_results; use itertools::Itertools; use kzg::Kzg; +use oneshot_broadcast::Receiver; use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella}; use parking_lot::{Mutex, RwLock}; use proto_array::{CountUnrealizedFull, DoNotReOrg, ProposerHeadError}; @@ -112,7 +114,9 @@ use store::{ use task_executor::{ShutdownReason, TaskExecutor}; use tokio::task::JoinHandle; use tree_hash::TreeHash; +use types::beacon_block_body::KzgCommitments; use types::beacon_state::CloneConfig; +use types::blob_sidecar::Blobs; use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; use types::consts::merge::INTERVALS_PER_SLOT; use types::*; @@ -276,7 +280,7 @@ pub enum StateSkipConfig { pub enum BlockProcessingResult { Verified(Hash256), - AvailabilityPending(ExecutedBlock), + AvailabilityPending(ExecutedBlock), } pub trait BeaconChainTypes: Send + Sync + 'static { @@ -440,7 +444,8 @@ pub struct BeaconChain { pub slasher: Option>>, /// Provides monitoring of a set of explicitly defined validators. pub validator_monitor: RwLock>, - pub blob_cache: BlobCache, + pub proposal_blob_cache: BlobCache, + pub data_availability_checker: Option>, pub kzg: Option>, } @@ -971,35 +976,9 @@ impl BeaconChain { pub async fn get_block_and_blobs_checking_early_attester_cache( &self, block_root: &Hash256, - ) -> Result>, Error> { - // If there is no data availability boundary, the Eip4844 fork is disabled. - if let Some(finalized_data_availability_boundary) = - self.finalized_data_availability_boundary() - { - // Only use the attester cache if we can find both the block and blob - if let (Some(block), Some(blobs)) = ( - self.early_attester_cache.get_block(*block_root), - self.early_attester_cache.get_blobs(*block_root), - ) { - Ok(Some(SignedBeaconBlockAndBlobsSidecar { - beacon_block: block, - blobs_sidecar: blobs, - })) - // Attempt to get the block and blobs from the database - } else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) { - let blobs = self - .get_blobs(block_root, finalized_data_availability_boundary)? - .map(Arc::new); - Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar { - beacon_block: block, - blobs_sidecar: blobs, - })) - } else { - Ok(None) - } - } else { - Ok(None) - } + ) -> Result, Error> { + //TODO(sean) use the rpc blobs cache and revert this to the current block cache logic + Ok(Some(())) } /// Returns the block at the given root, if any. @@ -1083,7 +1062,7 @@ impl BeaconChain { &self, block_root: &Hash256, ) -> Result>, Error> { - self.store.get_blobs(block_root) + Ok(self.store.get_blobs(block_root)?) } pub fn get_blinded_block( @@ -2674,7 +2653,7 @@ impl BeaconChain { unverified_block: B, count_unrealized: CountUnrealized, notify_execution_layer: NotifyExecutionLayer, - ) -> Result, BlockError> { + ) -> Result> { // Start the Prometheus timer. let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); @@ -2692,19 +2671,21 @@ impl BeaconChain { // TODO(log required errors) let executed_block = self + .clone() .into_executed_block(execution_pending, count_unrealized) .await?; // Check if the executed block has all it's blobs available to qualify as a fully // available block - let import_block = if let Ok(blobs) = self - .gossip_blob_cache - .lock() - .blobs(executed_block.block_root) - { - self.import_available_block(executed_block, blobs, count_unrealized) + let import_block = if let Some(da_checker) = self.data_availability_checker.as_ref() { + da_checker.put_block(executed_block); //TODO(sean) errors + return Err(BlockError::AvailabilityPending(block_root)); } else { - return Ok(BlockProcessingResult::AvailabilityPending(executed_block)); + self.clone().import_available_block( + executed_block, + VerifiedBlobs::PreEip4844, + count_unrealized, + ) }; // Verify and import the block. @@ -2721,7 +2702,7 @@ impl BeaconChain { // Increment the Prometheus counter for block processing successes. metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); - Ok(BlockProcessingResult::Verified(block_root)) + Ok(block_root) } Err(e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_))) => { debug!( @@ -2761,7 +2742,7 @@ impl BeaconChain { self: Arc, execution_pending_block: ExecutionPendingBlock, count_unrealized: CountUnrealized, - ) -> Result, BlockError> { + ) -> Result, BlockError> { let ExecutionPendingBlock { block, block_root, @@ -2826,8 +2807,8 @@ impl BeaconChain { /// (i.e., this function is not atomic). async fn import_available_block( self: Arc, - executed_block: ExecutedBlock, - blobs: Blobs, + executed_block: ExecutedBlock, + blobs: VerifiedBlobs, count_unrealized: CountUnrealized, ) -> Result> { let ExecutedBlock { @@ -4802,12 +4783,11 @@ impl BeaconChain { .as_ref() .ok_or(BlockProductionError::TrustedSetupNotInitialized)?; let beacon_block_root = block.canonical_root(); - let expected_kzg_commitments: &KzgCommitments = - block.body().blob_kzg_commitments().map_err(|_| { - BlockProductionError::InvalidBlockVariant( - "EIP4844 block does not contain kzg commitments".to_string(), - ) - })?; + let expected_kzg_commitments = block.body().blob_kzg_commitments().map_err(|_| { + BlockProductionError::InvalidBlockVariant( + "EIP4844 block does not contain kzg commitments".to_string(), + ) + })?; if expected_kzg_commitments.len() != blobs.len() { return Err(BlockProductionError::MissingKzgCommitment(format!( @@ -4856,7 +4836,8 @@ impl BeaconChain { .collect::>, BlockProductionError>>()?, ); - self.blob_cache.put(beacon_block_root, blob_sidecars); + self.proposal_blob_cache + .put(beacon_block_root, blob_sidecars); } metrics::inc_counter(&metrics::BLOCK_PRODUCTION_SUCCESSES); @@ -4874,8 +4855,8 @@ impl BeaconChain { fn compute_blob_kzg_proofs( kzg: &Arc, - blobs: &Blobs<::EthSpec>, - expected_kzg_commitments: &KzgCommitments<::EthSpec>, + blobs: &Blobs, + expected_kzg_commitments: &KzgCommitments, slot: Slot, ) -> Result, BlockProductionError> { blobs @@ -6174,47 +6155,50 @@ impl BeaconChain { .unwrap_or(false)) } - pub async fn check_data_availability( - &self, - block: Arc>, - ) -> Result, Error> { - let kzg_commitments = block - .message() - .body() - .blob_kzg_commitments() - .map_err(|_| BlobError::KzgCommitmentMissing)?; - let transactions = block - .message() - .body() - .execution_payload_eip4844() - .map(|payload| payload.transactions()) - .map_err(|_| BlobError::TransactionsMissing)? - .ok_or(BlobError::TransactionsMissing)?; + pub fn start_block_importer( + self: &Arc, + mut rx: tokio::sync::mpsc::Receiver>, + ) { + let chain = self.clone(); + self.task_executor.spawn( + async move { + while let Some(block) = rx.recv().await { + let ExecutedBlock { + block, + block_root, + state, + parent_block, + parent_eth1_finalization_data, + confirmed_state_roots, + consensus_context, + payload_verification_outcome, + } = block; - if verify_kzg_commitments_against_transactions::(transactions, kzg_commitments) - .is_err() - { - return Err(BlobError::TransactionCommitmentMismatch); - } + let available_block = block.into_available_block().unwrap(); //TODO(sean) remove unwrap - // Validate that the kzg proof is valid against the commitments and blobs - let kzg = self - .kzg - .as_ref() - .ok_or(BlobError::TrustedSetupNotInitialized)?; - - if !kzg_utils::validate_blobs_sidecar( - kzg, - block_slot, - block_root, - kzg_commitments, - blob_sidecar, - ) - .map_err(BlobError::KzgError)? - { - return Err(BlobError::InvalidKzgProof); - } - Ok(()) + let chain_inner = chain.clone(); + let block_hash = chain + .spawn_blocking_handle( + move || { + chain_inner.import_block( + available_block, + block_root, + state, + confirmed_state_roots, + payload_verification_outcome.payload_verification_status, + CountUnrealized::True, //TODO(sean) + parent_block, + parent_eth1_finalization_data, + consensus_context, + ) + }, + "block_importer", + ) + .await; + } + }, + "block_importer_listener", + ); } } diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 35fb4c1429..013fae1fd5 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -1,3 +1,4 @@ +use derivative::Derivative; use slot_clock::SlotClock; use std::sync::Arc; @@ -301,30 +302,36 @@ impl IntoAvailableBlock for BlockWrapper { } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Derivative)] +#[derivative(Hash(bound = "T: EthSpec"))] pub struct AvailableBlock { pub block: Arc>, - pub blobs: Blobs, + pub blobs: VerifiedBlobs, } impl AvailableBlock { pub fn blobs(&self) -> Option>> { match &self.blobs { - Blobs::NotRequired | Blobs::None => None, - Blobs::Available(blobs) => Some(blobs.clone()), + VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreEip4844 => { + None + } + VerifiedBlobs::Available(blobs) => Some(blobs.clone()), } } pub fn deconstruct(self) -> (Arc>, Option>>) { match self.blobs { - Blobs::NotRequired | Blobs::None => (self.block, None), - Blobs::Available(blobs) => (self.block, Some(blobs)), + VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreEip4844 => { + (self.block, None) + } + VerifiedBlobs::Available(blobs) => (self.block, Some(blobs)), } } } -#[derive(Clone, Debug, PartialEq)] -pub enum Blobs { +#[derive(Clone, Debug, PartialEq, Derivative)] +#[derivative(Hash(bound = "E: EthSpec"))] +pub enum VerifiedBlobs { /// These blobs are available. Available(Arc>), /// This block is from outside the data availability boundary so doesn't require @@ -333,7 +340,7 @@ pub enum Blobs { /// The block's `kzg_commitments` field is empty so it does not contain any blobs. EmptyBlobs, /// This is a block prior to the 4844 fork, so doesn't require any blobs - None, + PreEip4844, } pub trait AsBlock { @@ -348,7 +355,8 @@ pub trait AsBlock { fn canonical_root(&self) -> Hash256; } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Derivative)] +#[derivative(Hash(bound = "E: EthSpec"))] pub enum BlockWrapper { /// This variant is fully available. /// i.e. for pre-4844 blocks, it contains a (`SignedBeaconBlock`, `Blobs::None`) and for diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 544f484663..4ee15663f1 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -64,6 +64,7 @@ use crate::{ }, metrics, BeaconChain, BeaconChainError, BeaconChainTypes, }; +use derivative::Derivative; use eth2::types::EventKind; use execution_layer::PayloadStatus; use fork_choice::{AttestationFromBlock, PayloadVerificationStatus}; @@ -306,6 +307,7 @@ pub enum BlockError { parent_root: Hash256, }, BlobValidation(BlobError), + AvailabilityPending(Hash256), } impl From for BlockError { @@ -487,6 +489,7 @@ impl From for BlockError { } /// Stores information about verifying a payload against an execution engine. +#[derive(Clone)] pub struct PayloadVerificationOutcome { pub payload_verification_status: PayloadVerificationStatus, pub is_valid_merge_transition_block: bool, @@ -619,7 +622,8 @@ pub fn signature_verify_chain_segment( /// A wrapper around a `SignedBeaconBlock` that indicates it has been approved for re-gossiping on /// the p2p network. -#[derive(Debug)] +#[derive(Derivative)] +#[derivative(Debug(bound = "T: BeaconChainTypes"))] pub struct GossipVerifiedBlock { pub block: BlockWrapper, pub block_root: Hash256, @@ -663,14 +667,15 @@ pub struct ExecutionPendingBlock { pub payload_verification_handle: PayloadVerificationHandle, } -pub struct ExecutedBlock { - pub block: BlockWrapper, +#[derive(Clone)] +pub struct ExecutedBlock { + pub block: BlockWrapper, pub block_root: Hash256, - pub state: BeaconState, - pub parent_block: SignedBeaconBlock>, + pub state: BeaconState, + pub parent_block: SignedBeaconBlock>, pub parent_eth1_finalization_data: Eth1FinalizationData, pub confirmed_state_roots: Vec, - pub consensus_context: ConsensusContext, + pub consensus_context: ConsensusContext, pub payload_verification_outcome: PayloadVerificationOutcome, } @@ -1156,7 +1161,7 @@ impl IntoExecutionPendingBlock for BlockWrapper block .into_execution_pending_block_slashable(block_root, chain, notify_execution_layer), BlockWrapper::Available(AvailableBlock { block, blobs }) => { - let execution_pending_block = block.into_execution_pending_block_slashable( + let mut execution_pending_block = block.into_execution_pending_block_slashable( block_root, chain, notify_execution_layer, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 2d92ad6cb1..36a6a33796 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,9 +1,11 @@ use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY}; use crate::blob_cache::BlobCache; +use crate::block_verification::ExecutedBlock; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; use crate::eth1_finalization_cache::Eth1FinalizationCache; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; +use crate::gossip_blob_cache::DataAvailabilityChecker; use crate::head_tracker::HeadTracker; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::persisted_beacon_chain::PersistedBeaconChain; @@ -85,6 +87,7 @@ pub struct BeaconChainBuilder { event_handler: Option>, slot_clock: Option, shutdown_sender: Option>, + block_importer_sender: Option>>, head_tracker: Option, validator_pubkey_cache: Option>, spec: ChainSpec, @@ -127,6 +130,7 @@ where event_handler: None, slot_clock: None, shutdown_sender: None, + block_importer_sender: None, head_tracker: None, validator_pubkey_cache: None, spec: TEthSpec::default_spec(), @@ -558,6 +562,14 @@ where self } + pub fn block_importer_sender( + mut self, + sender: tokio::sync::mpsc::Sender>, + ) -> Self { + self.block_importer_sender = Some(sender); + self + } + /// Creates a new, empty operation pool. fn empty_op_pool(mut self) -> Self { self.op_pool = Some(OperationPool::new()); @@ -641,12 +653,18 @@ where slot_clock.now().ok_or("Unable to read slot")? }; - let kzg = if let Some(trusted_setup) = self.trusted_setup { + let (kzg, data_availability_checker) = if let (Some(tx), Some(trusted_setup)) = + (self.block_importer_sender, self.trusted_setup) + { let kzg = Kzg::new_from_trusted_setup(trusted_setup) .map_err(|e| format!("Failed to load trusted setup: {:?}", e))?; - Some(Arc::new(kzg)) + let kzg_arc = Arc::new(kzg); + ( + Some(kzg_arc.clone()), + Some(DataAvailabilityChecker::new(kzg_arc, tx)), + ) } else { - None + (None, None) }; let initial_head_block_root = fork_choice @@ -850,7 +868,9 @@ where graffiti: self.graffiti, slasher: self.slasher.clone(), validator_monitor: RwLock::new(validator_monitor), - blob_cache: BlobCache::default(), + //TODO(sean) should we move kzg solely to the da checker? + data_availability_checker, + proposal_blob_cache: BlobCache::default(), kzg, }; diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index bb2d3c5b44..b561b6ea1e 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -134,9 +134,9 @@ impl PayloadNotifier { /// contains a few extra checks by running `partially_verify_execution_payload` first: /// /// https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/bellatrix/beacon-chain.md#notify_new_payload -async fn notify_new_payload( +async fn notify_new_payload<'a, T: BeaconChainTypes>( chain: &Arc>, - block: BeaconBlockRef, + block: BeaconBlockRef<'a, T::EthSpec>, ) -> Result> { let execution_payload = block.execution_payload()?; diff --git a/beacon_node/beacon_chain/src/gossip_blob_cache.rs b/beacon_node/beacon_chain/src/gossip_blob_cache.rs index f4f2a77f0b..ed12970ec6 100644 --- a/beacon_node/beacon_chain/src/gossip_blob_cache.rs +++ b/beacon_node/beacon_chain/src/gossip_blob_cache.rs @@ -1,7 +1,7 @@ -use crate::blob_verification::verify_data_availability; +use crate::blob_verification::{verify_data_availability, AsBlock}; use crate::block_verification::{ExecutedBlock, IntoExecutionPendingBlock}; use crate::kzg_utils::validate_blob; -use crate::{BeaconChainError, BeaconChainTypes, BlockError}; +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, BlockError}; use eth2::reqwest::header::Entry; use kzg::Error as KzgError; use kzg::{Kzg, KzgCommitment}; @@ -9,9 +9,10 @@ use parking_lot::{Mutex, RwLock}; use ssz_types::VariableList; use std::collections::{BTreeMap, HashMap, HashSet}; use std::future::Future; -use std::sync::Arc; +use std::sync::{mpsc, Arc}; +use tokio::sync::mpsc::Sender; use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; -use types::{EthSpec, Hash256}; +use types::{EthSpec, Hash256, SignedBeaconBlock}; pub enum BlobCacheError { DuplicateBlob(Hash256), @@ -22,23 +23,25 @@ pub enum BlobCacheError { /// - commitments for blocks that have been gossip verified, but the commitments themselves /// have not been verified against blobs /// - blocks that have been fully verified and only require a data availability check -pub struct GossipBlobCache { - rpc_blob_cache: RwLock>>>, - gossip_blob_cache: Mutex>>, - kzg: Kzg, +pub struct DataAvailabilityChecker { + rpc_blob_cache: RwLock>>>, + gossip_blob_cache: Mutex>>, + kzg: Arc, + tx: Sender>, } -struct GossipBlobCacheInner { - verified_blobs: Vec>>, +struct GossipBlobCache { + verified_blobs: Vec>>, executed_block: Option>, } -impl GossipBlobCache { - pub fn new(kzg: Kzg) -> Self { +impl DataAvailabilityChecker { + pub fn new(kzg: Arc, tx: Sender>) -> Self { Self { - rpc_blob_cache: RwLock::new(HashMap::new()), - gossip_blob_cache: Mutex::new(HashMap::new()), + rpc_blob_cache: <_>::default(), + gossip_blob_cache: <_>::default(), kzg, + tx, } } @@ -47,9 +50,9 @@ impl GossipBlobCache { /// cached, verify the block and import it. /// /// This should only accept gossip verified blobs, so we should not have to worry about dupes. - pub fn put_blob(&self, blob: Arc>) -> Result<(), BlobCacheError> { + pub fn put_blob(&self, blob: Arc>) -> Result<(), BlobCacheError> { // TODO(remove clones) - let verified = validate_blob::( + let verified = validate_blob::( &self.kzg, blob.blob.clone(), blob.kzg_commitment.clone(), @@ -70,11 +73,33 @@ impl GossipBlobCache { .verified_blobs .insert(blob.index as usize, blob.clone()); - if let Some(executed_block) = inner.executed_block.as_ref() { - // trigger reprocessing ? + if let Some(executed_block) = inner.executed_block.take() { + let verified_commitments: Vec<_> = inner + .verified_blobs + .iter() + .map(|blob| blob.kzg_commitment) + .collect(); + if verified_commitments + == executed_block + .block + .as_block() + .message_eip4844() + .unwrap() //TODO(sean) errors + .body + .blob_kzg_commitments + .clone() + .to_vec() + { + // send to reprocessing queue ? + //TODO(sean) try_send? + //TODO(sean) errors + self.tx.try_send(executed_block); + } else { + let _ = inner.executed_block.insert(executed_block); + } } }) - .or_insert(GossipBlobCacheInner { + .or_insert(GossipBlobCache { verified_blobs: vec![blob.clone()], executed_block: None, }); @@ -93,24 +118,26 @@ impl GossipBlobCache { guard .entry(executed_block.block_root) .and_modify(|cache| { - if let Ok(block) = executed_block.block.message_eip4844() { - let verified_commitments_vec: Vec<_> = cache + let block: &SignedBeaconBlock = executed_block.block.as_block(); + if let Ok(block) = block.message_eip4844() { + let verified_commitments: Vec<_> = cache .verified_blobs .iter() .map(|blob| blob.kzg_commitment) .collect(); - let verified_commitments = VariableList::from(verified_commitments_vec); - if verified_commitments == block.body.blob_kzg_commitments { + if verified_commitments == block.body.blob_kzg_commitments.clone().to_vec() { // send to reprocessing queue ? + //TODO(sean) errors + self.tx.try_send(executed_block.clone()); } else { - let _ = cache.executed_block.insert(executed_block); + let _ = cache.executed_block.insert(executed_block.clone()); // log that we cached } } else { // log error } }) - .or_insert(GossipBlobCacheInner { + .or_insert(GossipBlobCache { verified_blobs: vec![], executed_block: Some(executed_block), }); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 865539a7b6..f1779b45fb 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -66,7 +66,7 @@ pub use self::historical_blocks::HistoricalBlockError; pub use attestation_verification::Error as AttestationError; pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError}; pub use block_verification::{ - get_block_root, BlockError, ExecutionPayloadError, GossipVerifiedBlock, + get_block_root, BlockError, ExecutedBlock, ExecutionPayloadError, GossipVerifiedBlock, }; pub use canonical_head::{CachedHead, CanonicalHead, CanonicalHeadRwLock}; pub use eth1_chain::{Eth1Chain, Eth1ChainBackend}; diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index e80b6fd18c..5d283eae61 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -862,6 +862,9 @@ where .ok_or("beacon_chain requires a runtime context")? .clone(); + let (tx, mut rx) = + tokio::sync::mpsc::channel::>(1000); //TODO(sean) capacity + let chain = self .beacon_chain_builder .ok_or("beacon_chain requires a beacon_chain_builder")? @@ -871,10 +874,14 @@ where .ok_or("beacon_chain requires a slot clock")?, ) .shutdown_sender(context.executor.shutdown_sender()) + .block_importer_sender(tx) .build() .map_err(|e| format!("Failed to build beacon chain: {}", e))?; - self.beacon_chain = Some(Arc::new(chain)); + let arc_chain = Arc::new(chain); + arc_chain.start_block_importer(rx); + + self.beacon_chain = Some(arc_chain); self.beacon_chain_builder = None; // a beacon chain requires a timer diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index b484f4079a..d53bd793e8 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -4,7 +4,7 @@ use eth2::types::BlockId as CoreBlockId; use std::fmt; use std::str::FromStr; use std::sync::Arc; -use types::{BlobsSidecar, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot}; +use types::{BlobSidecar, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot}; /// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given /// `BlockId`. @@ -216,13 +216,13 @@ impl BlockId { pub async fn blobs_sidecar( &self, chain: &BeaconChain, - ) -> Result>, warp::Rejection> { + ) -> Result>, warp::Rejection> { let root = self.root(chain)?.0; let Some(data_availability_boundary) = chain.data_availability_boundary() else { return Err(warp_utils::reject::custom_not_found("Eip4844 fork disabled".into())); }; - match chain.get_blobs(&root, data_availability_boundary) { - Ok(Some(blob)) => Ok(Arc::new(blob)), + match chain.get_blobs(&root) { + Ok(Some(blob)) => todo!(), // Jimmy's PR will fix this, Ok(None) => Err(warp_utils::reject::custom_not_found(format!( "Blob with block root {} is not in the store", root diff --git a/beacon_node/http_api/src/build_block_contents.rs b/beacon_node/http_api/src/build_block_contents.rs index 9fbde0ce06..6512197ef6 100644 --- a/beacon_node/http_api/src/build_block_contents.rs +++ b/beacon_node/http_api/src/build_block_contents.rs @@ -16,7 +16,7 @@ pub fn build_block_contents { let block_root = &block.canonical_root(); - if let Some(blob_sidecars) = chain.blob_cache.pop(block_root) { + if let Some(blob_sidecars) = chain.proposal_blob_cache.pop(block_root) { let block_and_blobs = BeaconBlockAndBlobSidecars { block, blob_sidecars, diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index dc8bb020ac..b9db262dd2 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -67,23 +67,23 @@ pub async fn publish_block( let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); metrics::observe_duration(&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, delay); - let available_block = match wrapped_block.into_available_block(block_root, &chain) { - Ok(available_block) => available_block, - Err(e) => { - let msg = format!("{:?}", e); + let available_block = match wrapped_block.clone().into_available_block() { + Some(available_block) => available_block, + None => { error!( log, - "Invalid block provided to HTTP API"; - "reason" => &msg + "Invalid block provided to HTTP API unavailable block"; //TODO(sean) probably want a real error here ); - return Err(warp_utils::reject::broadcast_without_import(msg)); + return Err(warp_utils::reject::broadcast_without_import( + "unavailable block".to_string(), + )); } }; match chain .process_block( block_root, - available_block.clone(), + wrapped_block, CountUnrealized::True, NotifyExecutionLayer::Yes, ) @@ -95,14 +95,14 @@ pub async fn publish_block( "Valid block from HTTP API"; "block_delay" => ?delay, "root" => format!("{}", root), - "proposer_index" => available_block.message().proposer_index(), - "slot" => available_block.slot(), + "proposer_index" => available_block.block.message().proposer_index(), + "slot" => available_block.block.slot(), ); // Notify the validator monitor. chain.validator_monitor.read().register_api_block( seen_timestamp, - available_block.message(), + available_block.block.message(), root, &chain.slot_clock, ); @@ -124,7 +124,7 @@ pub async fn publish_block( "Block was broadcast too late"; "msg" => "system may be overloaded, block likely to be orphaned", "delay_ms" => delay.as_millis(), - "slot" => available_block.slot(), + "slot" => available_block.block.slot(), "root" => ?root, ) } else if delay >= delayed_threshold { @@ -133,7 +133,7 @@ pub async fn publish_block( "Block broadcast was delayed"; "msg" => "system may be overloaded, block may be orphaned", "delay_ms" => delay.as_millis(), - "slot" => available_block.slot(), + "slot" => available_block.block.slot(), "root" => ?root, ) } @@ -145,7 +145,7 @@ pub async fn publish_block( log, "Block from HTTP API already known"; "block" => ?block_root, - "slot" => available_block.slot(), + "slot" => available_block.block.slot(), ); Ok(()) } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 0b64725544..3d56f53b02 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -802,6 +802,10 @@ impl Worker { verified_block } + Err(BlockError::AvailabilityPending(_)) => { + //TODO(sean) think about what to do hereA + todo!() + } Err(BlockError::ParentUnknown(block)) => { debug!( self.log, @@ -985,7 +989,7 @@ impl Worker { ) .await { - Ok(BlockProcessingResult::Verified(block_root)) => { + Ok(block_root) => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); if reprocess_tx @@ -1012,8 +1016,8 @@ impl Worker { self.chain.recompute_head_at_current_slot().await; } - Ok(BlockProcessingResult::AvailabilityPending(executed_block)) => { - // cache in blob cache and make rpc request for blob + Err(BlockError::AvailabilityPending(block_root)) => { + // make rpc request for blob } Err(BlockError::ParentUnknown(block)) => { // Inform the sync manager to find parents for this block diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 78b9de303f..a42b56794a 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -233,36 +233,37 @@ impl Worker { .get_block_and_blobs_checking_early_attester_cache(&root) .await { - Ok(Some(block_and_blobs)) => { - // - // TODO: HORRIBLE NSFW CODE AHEAD - // - let types::SignedBeaconBlockAndBlobsSidecar {beacon_block, blobs_sidecar} = block_and_blobs; - let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs_sidecar.as_ref().clone(); - // TODO: this should be unreachable after this is addressed seriously, - // so for now let's be ok with a panic in the expect. - let block = beacon_block.message_eip4844().expect("We fucked up the block blob stuff"); - // Intentionally not accessing the list directly - for (known_index, blob) in blob_bundle.into_iter().enumerate() { - if (known_index as u64) == index { - let blob_sidecar = types::BlobSidecar{ - block_root: beacon_block_root, - index, - slot: beacon_block_slot, - block_parent_root: block.parent_root, - proposer_index: block.proposer_index, - blob, - kzg_commitment: block.body.blob_kzg_commitments[known_index], // TODO: needs to be stored in a more logical way so that this won't panic. - kzg_proof: kzg_aggregated_proof // TODO: yeah - }; - self.send_response( - peer_id, - Response::BlobsByRoot(Some(Arc::new(blob_sidecar))), - request_id, - ); - send_block_count += 1; - } - } + Ok(Some(())) => { + todo!(); + // // + // // TODO: HORRIBLE NSFW CODE AHEAD + // // + // let types::SignedBeaconBlockAndBlobsSidecar {beacon_block, blobs_sidecar} = block_and_blobs; + // let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs_sidecar.as_ref().clone(); + // // TODO: this should be unreachable after this is addressed seriously, + // // so for now let's be ok with a panic in the expect. + // let block = beacon_block.message_eip4844().expect("We fucked up the block blob stuff"); + // // Intentionally not accessing the list directly + // for (known_index, blob) in blob_bundle.into_iter().enumerate() { + // if (known_index as u64) == index { + // let blob_sidecar = types::BlobSidecar{ + // block_root: beacon_block_root, + // index, + // slot: beacon_block_slot, + // block_parent_root: block.parent_root, + // proposer_index: block.proposer_index, + // blob, + // kzg_commitment: block.body.blob_kzg_commitments[known_index], // TODO: needs to be stored in a more logical way so that this won't panic. + // kzg_proof: kzg_aggregated_proof // TODO: yeah + // }; + // self.send_response( + // peer_id, + // Response::BlobsByRoot(Some(Arc::new(blob_sidecar))), + // request_id, + // ); + // send_block_count += 1; + // } + // } } Ok(None) => { debug!( @@ -836,35 +837,36 @@ impl Worker { let mut send_response = true; for root in block_roots { - match self.chain.get_blobs(&root, data_availability_boundary) { + match self.chain.get_blobs(&root) { Ok(Some(blobs)) => { - // TODO: more GROSS code ahead. Reader beware - let types::BlobsSidecar { - beacon_block_root, - beacon_block_slot, - blobs: blob_bundle, - kzg_aggregated_proof: _, - }: types::BlobsSidecar<_> = blobs; - - for (blob_index, blob) in blob_bundle.into_iter().enumerate() { - let blob_sidecar = types::BlobSidecar { - block_root: beacon_block_root, - index: blob_index as u64, - slot: beacon_block_slot, - block_parent_root: Hash256::zero(), - proposer_index: 0, - blob, - kzg_commitment: types::KzgCommitment::default(), - kzg_proof: types::KzgProof::default(), - }; - - blobs_sent += 1; - self.send_network_message(NetworkMessage::SendResponse { - peer_id, - response: Response::BlobsByRange(Some(Arc::new(blob_sidecar))), - id: request_id, - }); - } + todo!(); + // // TODO: more GROSS code ahead. Reader beware + // let types::BlobsSidecar { + // beacon_block_root, + // beacon_block_slot, + // blobs: blob_bundle, + // kzg_aggregated_proof: _, + // }: types::BlobsSidecar<_> = blobs; + // + // for (blob_index, blob) in blob_bundle.into_iter().enumerate() { + // let blob_sidecar = types::BlobSidecar { + // block_root: beacon_block_root, + // index: blob_index as u64, + // slot: beacon_block_slot, + // block_parent_root: Hash256::zero(), + // proposer_index: 0, + // blob, + // kzg_commitment: types::KzgCommitment::default(), + // kzg_proof: types::KzgProof::default(), + // }; + // + // blobs_sent += 1; + // self.send_network_message(NetworkMessage::SendResponse { + // peer_id, + // response: Response::BlobsByRange(Some(Arc::new(blob_sidecar))), + // id: request_id, + // }); + // } } Ok(None) => { error!( diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 093e69729b..5bbcb4e1f3 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -86,27 +86,16 @@ impl Worker { }; let slot = block.slot(); let parent_root = block.message().parent_root(); - let available_block = block - .into_available_block(block_root, &self.chain) - .map_err(BlockError::BlobValidation); - let result = match available_block { - Ok(BlockProcessingResult::Verified(block)) => { - self.chain - .process_block( - block_root, - block, - CountUnrealized::True, - NotifyExecutionLayer::Yes, - ) - .await - } - Ok(BlockProcessingResult::AvailabilityPending(executed_block)) => { - // Shouldn't happen as sync should only send blocks for processing - // after sending blocks into the availability cache. - } - Err(e) => Err(e), - }; + let result = self + .chain + .process_block( + block_root, + block, + CountUnrealized::True, + NotifyExecutionLayer::Yes, + ) + .await; metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); diff --git a/consensus/state_processing/src/consensus_context.rs b/consensus/state_processing/src/consensus_context.rs index 4c8966f92c..37bd5fe446 100644 --- a/consensus/state_processing/src/consensus_context.rs +++ b/consensus/state_processing/src/consensus_context.rs @@ -7,7 +7,7 @@ use types::{ ChainSpec, Epoch, EthSpec, Hash256, IndexedAttestation, SignedBeaconBlock, Slot, }; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ConsensusContext { /// Slot to act as an identifier/safeguard slot: Slot, diff --git a/lcli/src/parse_ssz.rs b/lcli/src/parse_ssz.rs index 0e87e330b1..70f34e09e3 100644 --- a/lcli/src/parse_ssz.rs +++ b/lcli/src/parse_ssz.rs @@ -63,7 +63,7 @@ pub fn run_parse_ssz(matches: &ArgMatches) -> Result<(), String> { "state_merge" => decode_and_print::>(&bytes, format)?, "state_capella" => decode_and_print::>(&bytes, format)?, "state_eip4844" => decode_and_print::>(&bytes, format)?, - "blobs_sidecar" => decode_and_print::>(&bytes, format)?, + "blob_sidecar" => decode_and_print::>(&bytes, format)?, other => return Err(format!("Unknown type: {}", other)), }; diff --git a/testing/ef_tests/src/type_name.rs b/testing/ef_tests/src/type_name.rs index 19b3535fbf..d94dfef485 100644 --- a/testing/ef_tests/src/type_name.rs +++ b/testing/ef_tests/src/type_name.rs @@ -50,7 +50,7 @@ type_name_generic!(BeaconBlockBodyCapella, "BeaconBlockBody"); type_name_generic!(BeaconBlockBodyEip4844, "BeaconBlockBody"); type_name!(BeaconBlockHeader); type_name_generic!(BeaconState); -type_name_generic!(BlobsSidecar); +type_name_generic!(BlobSidecar); type_name!(Checkpoint); type_name_generic!(ContributionAndProof); type_name!(Deposit); @@ -88,9 +88,5 @@ type_name!(Validator); type_name!(VoluntaryExit); type_name!(Withdrawal); type_name!(BlsToExecutionChange, "BLSToExecutionChange"); -type_name_generic!( - SignedBeaconBlockAndBlobsSidecarDecode, - "SignedBeaconBlockAndBlobsSidecar" -); type_name!(SignedBlsToExecutionChange, "SignedBLSToExecutionChange"); type_name!(HistoricalSummary);