diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 016bda13ab..00be8a25c9 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -959,35 +959,20 @@ impl BeaconChain { Ok(self.get_block(block_root).await?.map(Arc::new)) } - pub async fn get_block_and_blobs_checking_early_attester_cache( + pub async fn get_blobs_checking_early_attester_cache( &self, block_root: &Hash256, - ) -> Result>, Error> { + ) -> Result>, Error> { // If there is no data availability boundary, the Eip4844 fork is disabled. if let Some(finalized_data_availability_boundary) = self.finalized_data_availability_boundary() { - // Only use the attester cache if we can find both the block and blob - if let (Some(block), Some(blobs)) = ( - self.early_attester_cache.get_block(*block_root), - self.early_attester_cache.get_blobs(*block_root), - ) { - Ok(Some(SignedBeaconBlockAndBlobsSidecar { - beacon_block: block, - blobs_sidecar: blobs, - })) - // Attempt to get the block and blobs from the database - } else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) { - let blobs = self - .get_blobs(block_root, finalized_data_availability_boundary)? - .map(Arc::new); - Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar { - beacon_block: block, - blobs_sidecar: blobs, - })) - } else { - Ok(None) - } + self.early_attester_cache + .get_blobs(*block_root) + .map_or_else( + || self.get_blobs(block_root, finalized_data_availability_boundary), + |blobs| Ok(Some(blobs)), + ) } else { Ok(None) } @@ -1057,23 +1042,6 @@ impl BeaconChain { .map(Some) } - // FIXME(jimmy): temporary method added to unblock API work. This method will be replaced by - // the `get_blobs` method below once the new blob sidecar structure (`BlobSidecarList`) is - // implemented in that method. - #[allow(clippy::type_complexity)] // FIXME: this will be fixed by the `BlobSidecarList` alias in Sean's PR - pub fn get_blob_sidecar_list( - &self, - _block_root: &Hash256, - _data_availability_boundary: Epoch, - ) -> Result< - Option< - VariableList>, ::MaxBlobsPerBlock>, - >, - Error, - > { - unimplemented!("update to use the updated `get_blobs` method instead once this PR is merged: https://github.com/sigp/lighthouse/pull/4104") - } - /// Returns the blobs at the given root, if any. /// /// Returns `Ok(None)` if the blobs and associated block are not found. @@ -1091,9 +1059,9 @@ impl BeaconChain { &self, block_root: &Hash256, data_availability_boundary: Epoch, - ) -> Result>, Error> { + ) -> Result>, Error> { match self.store.get_blobs(block_root)? { - Some(blobs) => Ok(Some(blobs)), + Some(blob_sidecar_list) => Ok(Some(blob_sidecar_list)), None => { // Check for the corresponding block to understand whether we *should* have blobs. self.get_blinded_block(block_root)? @@ -1106,7 +1074,8 @@ impl BeaconChain { Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock), }; if expected_kzg_commitments.is_empty() { - Ok(BlobsSidecar::empty_from_parts(*block_root, block.slot())) + // TODO (mark): verify this + Ok(BlobSidecarList::empty()) } else if data_availability_boundary <= block.epoch() { // We should have blobs for all blocks younger than the boundary. Err(Error::BlobsUnavailable) @@ -3052,7 +3021,7 @@ impl BeaconChain { // margin, or younger (of higher epoch number). if block_epoch >= import_boundary { if let Some(blobs) = blobs { - if !blobs.blobs.is_empty() { + if !blobs.is_empty() { //FIXME(sean) using this for debugging for now info!( self.log, "Writing blobs to store"; @@ -4814,7 +4783,7 @@ impl BeaconChain { ) .map_err(BlockProductionError::KzgError)?; - let blob_sidecars = VariableList::from( + let blob_sidecars = BlobSidecarList::from( blobs .into_iter() .enumerate() @@ -4827,7 +4796,7 @@ impl BeaconChain { .get(blob_index) .expect("KZG proof should exist for blob"); - Ok(BlobSidecar { + Ok(Arc::new(BlobSidecar { block_root: beacon_block_root, index: blob_index as u64, slot, @@ -4836,9 +4805,9 @@ impl BeaconChain { blob, kzg_commitment: *kzg_commitment, kzg_proof: *kzg_proof, - }) + })) }) - .collect::>, BlockProductionError>>()?, + .collect::, BlockProductionError>>()?, ); self.blob_cache.put(beacon_block_root, blob_sidecars); diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index dd4109da9b..5fe14c7e25 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -21,7 +21,7 @@ pub struct CacheItem { * Values used to make the block available. */ block: Arc>, - blobs: Option>>, + blobs: Option>, proto_block: ProtoBlock, } @@ -160,7 +160,7 @@ impl EarlyAttesterCache { } /// Returns the blobs, if `block_root` matches the cached item. - pub fn get_blobs(&self, block_root: Hash256) -> Option>> { + pub fn get_blobs(&self, block_root: Hash256) -> Option> { self.item .read() .as_ref() diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 9183437f99..ef7affeb7f 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -1,10 +1,10 @@ use crate::{state_id::checkpoint_slot_and_execution_optimistic, ExecutionOptimistic}; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; -use eth2::types::{BlockId as CoreBlockId, VariableList}; +use eth2::types::BlockId as CoreBlockId; use std::fmt; use std::str::FromStr; use std::sync::Arc; -use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot}; +use types::{BlobSidecarList, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot}; /// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given /// `BlockId`. @@ -216,16 +216,13 @@ impl BlockId { pub async fn blob_sidecar_list( &self, chain: &BeaconChain, - ) -> Result< - VariableList>, ::MaxBlobsPerBlock>, - warp::Rejection, - > { + ) -> Result, warp::Rejection> { let root = self.root(chain)?.0; let Some(data_availability_boundary) = chain.data_availability_boundary() else { return Err(warp_utils::reject::custom_not_found("Deneb fork disabled".into())); }; - match chain.get_blob_sidecar_list(&root, data_availability_boundary) { - Ok(Some(blobs)) => Ok(blobs), + match chain.get_blobs(&root, data_availability_boundary) { + Ok(Some(blob_sidecar_list)) => Ok(blob_sidecar_list), Ok(None) => Err(warp_utils::reject::custom_not_found(format!( "No blobs with block root {} found in the store", root diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 78b9de303f..565b1ce886 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -12,6 +12,7 @@ use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error, warn}; use slot_clock::SlotClock; +use std::collections::{hash_map::Entry, HashMap}; use std::sync::Arc; use task_executor::TaskExecutor; use types::blob_sidecar::BlobIdentifier; @@ -225,42 +226,34 @@ impl Worker { executor.spawn( async move { let requested_blobs = request.blob_ids.len(); - let mut send_block_count = 0; + let mut send_blob_count = 0; let mut send_response = true; + + let mut blob_list_results = HashMap::new(); for BlobIdentifier{ block_root: root, index } in request.blob_ids.into_iter() { - match self - .chain - .get_block_and_blobs_checking_early_attester_cache(&root) - .await - { - Ok(Some(block_and_blobs)) => { - // - // TODO: HORRIBLE NSFW CODE AHEAD - // - let types::SignedBeaconBlockAndBlobsSidecar {beacon_block, blobs_sidecar} = block_and_blobs; - let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs_sidecar.as_ref().clone(); - // TODO: this should be unreachable after this is addressed seriously, - // so for now let's be ok with a panic in the expect. - let block = beacon_block.message_eip4844().expect("We fucked up the block blob stuff"); - // Intentionally not accessing the list directly - for (known_index, blob) in blob_bundle.into_iter().enumerate() { - if (known_index as u64) == index { - let blob_sidecar = types::BlobSidecar{ - block_root: beacon_block_root, - index, - slot: beacon_block_slot, - block_parent_root: block.parent_root, - proposer_index: block.proposer_index, - blob, - kzg_commitment: block.body.blob_kzg_commitments[known_index], // TODO: needs to be stored in a more logical way so that this won't panic. - kzg_proof: kzg_aggregated_proof // TODO: yeah - }; + let blob_list_result = match blob_list_results.entry(root) { + Entry::Vacant(entry) => { + entry.insert(self + .chain + .get_blobs_checking_early_attester_cache(&root) + .await) + } + Entry::Occupied(entry) => { + entry.into_mut() + } + }; + + match blob_list_result.as_ref() { + Ok(Some(blobs_sidecar_list)) => { + for blob_sidecar in blobs_sidecar_list.iter() { + if blob_sidecar.index == index { self.send_response( peer_id, - Response::BlobsByRoot(Some(Arc::new(blob_sidecar))), + Response::BlobsByRoot(Some(blob_sidecar.clone())), request_id, ); - send_block_count += 1; + send_blob_count += 1; + break; } } } @@ -355,7 +348,7 @@ impl Worker { "Received BlobsByRoot Request"; "peer" => %peer_id, "requested" => requested_blobs, - "returned" => send_block_count + "returned" => send_blob_count ); // send stream termination @@ -837,31 +830,12 @@ impl Worker { for root in block_roots { match self.chain.get_blobs(&root, data_availability_boundary) { - Ok(Some(blobs)) => { - // TODO: more GROSS code ahead. Reader beware - let types::BlobsSidecar { - beacon_block_root, - beacon_block_slot, - blobs: blob_bundle, - kzg_aggregated_proof: _, - }: types::BlobsSidecar<_> = blobs; - - for (blob_index, blob) in blob_bundle.into_iter().enumerate() { - let blob_sidecar = types::BlobSidecar { - block_root: beacon_block_root, - index: blob_index as u64, - slot: beacon_block_slot, - block_parent_root: Hash256::zero(), - proposer_index: 0, - blob, - kzg_commitment: types::KzgCommitment::default(), - kzg_proof: types::KzgProof::default(), - }; - + Ok(Some(blob_sidecar_list)) => { + for blob_sidecar in blob_sidecar_list.iter() { blobs_sent += 1; self.send_network_message(NetworkMessage::SendResponse { peer_id, - response: Response::BlobsByRange(Some(Arc::new(blob_sidecar))), + response: Response::BlobsByRange(Some(blob_sidecar.clone())), id: request_id, }); } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 60e2f77595..2c80c2c1ac 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -66,7 +66,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// The hot database also contains all blocks. pub hot_db: Hot, /// LRU cache of deserialized blobs. Updated whenever a blob is loaded. - blob_cache: Mutex>>, + blob_cache: Mutex>>, /// LRU cache of deserialized blocks. Updated whenever a block is loaded. block_cache: Mutex>>, /// Chain spec. @@ -568,7 +568,7 @@ impl, Cold: ItemStore> HotColdDB blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes()) } - pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar) -> Result<(), Error> { + pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobSidecarList) -> Result<(), Error> { let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); blobs_db.put_bytes( DBColumn::BeaconBlob.into(), @@ -582,7 +582,7 @@ impl, Cold: ItemStore> HotColdDB pub fn blobs_as_kv_store_ops( &self, key: &Hash256, - blobs: &BlobsSidecar, + blobs: BlobSidecarList, ops: &mut Vec, ) { let db_key = get_key_for_col(DBColumn::BeaconBlob.into(), key.as_bytes()); @@ -817,7 +817,7 @@ impl, Cold: ItemStore> HotColdDB } StoreOp::PutBlobs(block_root, blobs) => { - self.blobs_as_kv_store_ops(&block_root, &blobs, &mut key_value_batch); + self.blobs_as_kv_store_ops(&block_root, blobs, &mut key_value_batch); } StoreOp::PutStateSummary(state_root, summary) => { @@ -885,8 +885,8 @@ impl, Cold: ItemStore> HotColdDB StoreOp::PutBlobs(_, _) => true, StoreOp::DeleteBlobs(block_root) => { match self.get_blobs(block_root) { - Ok(Some(blobs_sidecar)) => { - blobs_to_delete.push(blobs_sidecar); + Ok(Some(blobs_sidecar_list)) => { + blobs_to_delete.push((*block_root, blobs_sidecar_list)); } Err(e) => { error!( @@ -926,7 +926,7 @@ impl, Cold: ItemStore> HotColdDB let reverse_op = match op { StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root), StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() { - Some(blobs) => StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs)), + Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs), None => return Err(HotColdDBError::Rollback.into()), }, _ => return Err(HotColdDBError::Rollback.into()), @@ -972,7 +972,7 @@ impl, Cold: ItemStore> HotColdDB for op in blob_cache_ops { match op { StoreOp::PutBlobs(block_root, blobs) => { - guard_blob.put(block_root, (*blobs).clone()); + guard_blob.put(block_root, blobs); } StoreOp::DeleteBlobs(block_root) => { @@ -1320,12 +1320,12 @@ impl, Cold: ItemStore> HotColdDB } /// Fetch a blobs sidecar from the store. - pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { + pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { Some(ref blobs_bytes) => { - let blobs = BlobsSidecar::from_ssz_bytes(blobs_bytes)?; + let blobs = BlobSidecarList::from_ssz_bytes(blobs_bytes)?; // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, // may want to attempt to use one again self.blob_cache.lock().put(*block_root, blobs.clone()); diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 3056c29292..29fded5fa6 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -159,7 +159,8 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), - PutBlobs(Hash256, Arc>), + // TODO (mark): space can be optimized here by de-duplicating data + PutBlobs(Hash256, BlobSidecarList), PutOrphanedBlobsKey(Hash256), PutStateSummary(Hash256, HotStateSummary), PutStateTemporaryFlag(Hash256), diff --git a/consensus/tree_hash/src/impls.rs b/consensus/tree_hash/src/impls.rs index 899356f833..134be40219 100644 --- a/consensus/tree_hash/src/impls.rs +++ b/consensus/tree_hash/src/impls.rs @@ -1,5 +1,6 @@ use super::*; use ethereum_types::{H160, H256, U128, U256}; +use std::sync::Arc; fn int_to_hash256(int: u64) -> Hash256 { let mut bytes = [0; HASHSIZE]; @@ -186,6 +187,24 @@ impl TreeHash for H256 { } } +impl TreeHash for Arc { + fn tree_hash_type() -> TreeHashType { + T::tree_hash_type() + } + + fn tree_hash_packed_encoding(&self) -> PackedEncoding { + self.as_ref().tree_hash_packed_encoding() + } + + fn tree_hash_packing_factor() -> usize { + T::tree_hash_packing_factor() + } + + fn tree_hash_root(&self) -> Hash256 { + self.as_ref().tree_hash_root() + } +} + #[cfg(test)] mod test { use super::*; diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index 169c570d29..29eaadc584 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -6,6 +6,7 @@ use serde_derive::{Deserialize, Serialize}; use ssz::Encode; use ssz_derive::{Decode, Encode}; use ssz_types::VariableList; +use std::sync::Arc; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; @@ -47,7 +48,7 @@ pub struct BlobSidecar { pub kzg_proof: KzgProof, } -pub type BlobSidecarList = VariableList, ::MaxBlobsPerBlock>; +pub type BlobSidecarList = VariableList>, ::MaxBlobsPerBlock>; impl SignedRoot for BlobSidecar {} diff --git a/consensus/types/src/signed_blob.rs b/consensus/types/src/signed_blob.rs index f9ae481247..4eb28794ed 100644 --- a/consensus/types/src/signed_blob.rs +++ b/consensus/types/src/signed_blob.rs @@ -3,6 +3,7 @@ use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; use ssz_types::VariableList; +use std::sync::Arc; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; @@ -23,7 +24,7 @@ use tree_hash_derive::TreeHash; #[arbitrary(bound = "T: EthSpec")] #[derivative(Hash(bound = "T: EthSpec"))] pub struct SignedBlobSidecar { - pub message: BlobSidecar, + pub message: Arc>, pub signature: Signature, }