diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 06c6cb31d3..fbd8d248c2 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -983,12 +983,23 @@ impl BeaconChain { Ok(self.get_block(block_root).await?.map(Arc::new)) } - pub async fn get_block_and_blobs_checking_early_attester_cache( + pub async fn get_blobs_checking_early_attester_cache( &self, - _block_root: &Hash256, - ) -> Result, Error> { - //TODO(sean) use the rpc blobs cache and revert this to the current block cache logic - Ok(Some(())) + block_root: &Hash256, + ) -> Result>, Error> { + // If there is no data availability boundary, the Eip4844 fork is disabled. + if let Some(finalized_data_availability_boundary) = + self.finalized_data_availability_boundary() + { + self.early_attester_cache + .get_blobs(*block_root) + .map_or_else( + || self.get_blobs(block_root, finalized_data_availability_boundary), + |blobs| Ok(Some(blobs)), + ) + } else { + Ok(None) + } } /// Returns the block at the given root, if any. @@ -3101,12 +3112,14 @@ impl BeaconChain { // margin, or younger (of higher epoch number). if block_epoch >= import_boundary { if let Some(blobs) = blobs { - //FIXME(sean) using this for debugging for now - info!( - self.log, "Writing blobs to store"; - "block_root" => ?block_root - ); - ops.push(StoreOp::PutBlobs(block_root, blobs)); + if !blobs.is_empty() { + //FIXME(sean) using this for debugging for now + info!( + self.log, "Writing blobs to store"; + "block_root" => ?block_root + ); + ops.push(StoreOp::PutBlobs(block_root, blobs)); + } } } } @@ -4860,7 +4873,7 @@ impl BeaconChain { ) .map_err(BlockProductionError::KzgError)?; - let blob_sidecars = VariableList::from( + let blob_sidecars = BlobSidecarList::from( blobs .into_iter() .enumerate() @@ -4873,7 +4886,7 @@ impl BeaconChain { .get(blob_index) .expect("KZG proof should exist for blob"); - Ok(BlobSidecar { + Ok(Arc::new(BlobSidecar { block_root: beacon_block_root, index: blob_index as u64, slot, @@ -4882,9 +4895,9 @@ impl BeaconChain { blob, kzg_commitment: *kzg_commitment, kzg_proof: *kzg_proof, - }) + })) }) - .collect::>, BlockProductionError>>()?, + .collect::, BlockProductionError>>()?, ); self.proposal_blob_cache diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index fed2921de3..36675f74be 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -4,7 +4,7 @@ use eth2::types::BlockId as CoreBlockId; use std::fmt; use std::str::FromStr; use std::sync::Arc; -use types::{BlobSidecar, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot}; +use types::{BlobSidecarList, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot}; /// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given /// `BlockId`. @@ -212,19 +212,16 @@ impl BlockId { } } - /// Return the `BlobsSidecar` identified by `self`. - pub async fn blobs_sidecar( + /// Return the `BlobSidecarList` identified by `self`. + pub async fn blob_sidecar_list( &self, chain: &BeaconChain, - ) -> Result>, warp::Rejection> { + ) -> Result, warp::Rejection> { let root = self.root(chain)?.0; - let Some(_data_availability_boundary) = chain.data_availability_boundary() else { - return Err(warp_utils::reject::custom_not_found("Eip4844 fork disabled".into())); - }; match chain.get_blobs(&root) { - Ok(Some(_blob)) => todo!(), // Jimmy's PR will fix this, + Ok(Some(blob_sidecar_list)) => Ok(blob_sidecar_list), Ok(None) => Err(warp_utils::reject::custom_not_found(format!( - "Blob with block root {} is not in the store", + "No blobs with block root {} found in the store", root ))), Err(e) => Err(warp_utils::reject::beacon_chain_error(e)), diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 6b0518a23c..797e8f72b4 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1293,6 +1293,45 @@ pub fn serve( }, ); + /* + * beacon/blobs + */ + + // GET beacon/blobs/{block_id} + let get_blobs = eth_v1 + .and(warp::path("beacon")) + .and(warp::path("blobs")) + .and(block_id_or_err) + .and(warp::path::end()) + .and(chain_filter.clone()) + .and(warp::header::optional::("accept")) + .and_then( + |block_id: BlockId, + chain: Arc>, + accept_header: Option| { + async move { + let blob_sidecar_list = block_id.blob_sidecar_list(&chain).await?; + + match accept_header { + Some(api_types::Accept::Ssz) => Response::builder() + .status(200) + .header("Content-Type", "application/octet-stream") + .body(blob_sidecar_list.as_ssz_bytes().into()) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "failed to create response: {}", + e + )) + }), + _ => Ok(warp::reply::json(&api_types::GenericResponse::from( + blob_sidecar_list, + )) + .into_response()), + } + } + }, + ); + /* * beacon/pool */ @@ -3498,41 +3537,6 @@ pub fn serve( ) }); - // GET lighthouse/beacon/blobs_sidecars/{block_id} - let get_lighthouse_blobs_sidecars = warp::path("lighthouse") - .and(warp::path("beacon")) - .and(warp::path("blobs_sidecars")) - .and(block_id_or_err) - .and(warp::path::end()) - .and(chain_filter.clone()) - .and(warp::header::optional::("accept")) - .and_then( - |block_id: BlockId, - chain: Arc>, - accept_header: Option| { - async move { - let blobs_sidecar = block_id.blobs_sidecar(&chain).await?; - - match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() - .status(200) - .header("Content-Type", "application/octet-stream") - .body(blobs_sidecar.as_ssz_bytes().into()) - .map_err(|e| { - warp_utils::reject::custom_server_error(format!( - "failed to create response: {}", - e - )) - }), - _ => Ok(warp::reply::json(&api_types::GenericResponse::from( - blobs_sidecar, - )) - .into_response()), - } - } - }, - ); - let get_events = eth_v1 .and(warp::path("events")) .and(warp::path::end()) @@ -3627,6 +3631,7 @@ pub fn serve( .uor(get_beacon_block_attestations) .uor(get_beacon_blinded_block) .uor(get_beacon_block_root) + .uor(get_blobs) .uor(get_beacon_pool_attestations) .uor(get_beacon_pool_attester_slashings) .uor(get_beacon_pool_proposer_slashings) @@ -3672,7 +3677,6 @@ pub fn serve( .uor(get_lighthouse_attestation_performance) .uor(get_lighthouse_block_packing_efficiency) .uor(get_lighthouse_merge_readiness) - .uor(get_lighthouse_blobs_sidecars.boxed()) .uor(get_events) .recover(warp_utils::reject::handle_rejection), ) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 5d4a8e3b05..6ffc042782 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -12,6 +12,7 @@ use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error, warn}; use slot_clock::SlotClock; +use std::collections::{hash_map::Entry, HashMap}; use std::sync::Arc; use task_executor::TaskExecutor; use types::blob_sidecar::BlobIdentifier; @@ -225,45 +226,36 @@ impl Worker { executor.spawn( async move { let requested_blobs = request.blob_ids.len(); - let send_block_count = 0; + let mut send_blob_count = 0; let mut send_response = true; - for BlobIdentifier{ block_root: root, index: _index } in request.blob_ids.into_iter() { - match self - .chain - .get_block_and_blobs_checking_early_attester_cache(&root) - .await - { - Ok(Some(())) => { - todo!(); - // // - // // TODO: HORRIBLE NSFW CODE AHEAD - // // - // let types::SignedBeaconBlockAndBlobsSidecar {beacon_block, blobs_sidecar} = block_and_blobs; - // let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs_sidecar.as_ref().clone(); - // // TODO: this should be unreachable after this is addressed seriously, - // // so for now let's be ok with a panic in the expect. - // let block = beacon_block.message_eip4844().expect("We fucked up the block blob stuff"); - // // Intentionally not accessing the list directly - // for (known_index, blob) in blob_bundle.into_iter().enumerate() { - // if (known_index as u64) == index { - // let blob_sidecar = types::BlobSidecar{ - // block_root: beacon_block_root, - // index, - // slot: beacon_block_slot, - // block_parent_root: block.parent_root, - // proposer_index: block.proposer_index, - // blob, - // kzg_commitment: block.body.blob_kzg_commitments[known_index], // TODO: needs to be stored in a more logical way so that this won't panic. - // kzg_proof: kzg_aggregated_proof // TODO: yeah - // }; - // self.send_response( - // peer_id, - // Response::BlobsByRoot(Some(Arc::new(blob_sidecar))), - // request_id, - // ); - // send_block_count += 1; - // } - // } + + let mut blob_list_results = HashMap::new(); + for BlobIdentifier{ block_root: root, index } in request.blob_ids.into_iter() { + let blob_list_result = match blob_list_results.entry(root) { + Entry::Vacant(entry) => { + entry.insert(self + .chain + .get_blobs_checking_early_attester_cache(&root) + .await) + } + Entry::Occupied(entry) => { + entry.into_mut() + } + }; + + match blob_list_result.as_ref() { + Ok(Some(blobs_sidecar_list)) => { + for blob_sidecar in blobs_sidecar_list.iter() { + if blob_sidecar.index == index { + self.send_response( + peer_id, + Response::BlobsByRoot(Some(blob_sidecar.clone())), + request_id, + ); + send_blob_count += 1; + break; + } + } } Ok(None) => { debug!( @@ -356,7 +348,7 @@ impl Worker { "Received BlobsByRoot Request"; "peer" => %peer_id, "requested" => requested_blobs, - "returned" => send_block_count + "returned" => send_blob_count ); // send stream termination @@ -837,36 +829,16 @@ impl Worker { let mut send_response = true; for root in block_roots { - match self.chain.get_blobs(&root) { - Ok(Some(_blobs)) => { - todo!(); - // // TODO: more GROSS code ahead. Reader beware - // let types::BlobsSidecar { - // beacon_block_root, - // beacon_block_slot, - // blobs: blob_bundle, - // kzg_aggregated_proof: _, - // }: types::BlobsSidecar<_> = blobs; - // - // for (blob_index, blob) in blob_bundle.into_iter().enumerate() { - // let blob_sidecar = types::BlobSidecar { - // block_root: beacon_block_root, - // index: blob_index as u64, - // slot: beacon_block_slot, - // block_parent_root: Hash256::zero(), - // proposer_index: 0, - // blob, - // kzg_commitment: types::KzgCommitment::default(), - // kzg_proof: types::KzgProof::default(), - // }; - // - // blobs_sent += 1; - // self.send_network_message(NetworkMessage::SendResponse { - // peer_id, - // response: Response::BlobsByRange(Some(Arc::new(blob_sidecar))), - // id: request_id, - // }); - // } + match self.chain.get_blobs(&root, data_availability_boundary) { + Ok(Some(blob_sidecar_list)) => { + for blob_sidecar in blob_sidecar_list.iter() { + blobs_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::BlobsByRange(Some(blob_sidecar.clone())), + id: request_id, + }); + } } Ok(None) => { error!( diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 83d995833a..fc902d866f 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -569,11 +569,7 @@ impl, Cold: ItemStore> HotColdDB blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes()) } - pub fn put_blobs( - &self, - block_root: &Hash256, - blobs: BlobSidecarList, - ) -> Result<(), Error> { + pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobSidecarList) -> Result<(), Error> { let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); blobs_db.put_bytes( DBColumn::BeaconBlob.into(), @@ -587,7 +583,7 @@ impl, Cold: ItemStore> HotColdDB pub fn blobs_as_kv_store_ops( &self, key: &Hash256, - blobs: &BlobSidecarList, + blobs: BlobSidecarList, ops: &mut Vec, ) { let db_key = get_key_for_col(DBColumn::BeaconBlob.into(), key.as_bytes()); @@ -822,7 +818,7 @@ impl, Cold: ItemStore> HotColdDB } StoreOp::PutBlobs(block_root, blobs) => { - self.blobs_as_kv_store_ops(&block_root, &blobs, &mut key_value_batch); + self.blobs_as_kv_store_ops(&block_root, blobs, &mut key_value_batch); } StoreOp::PutStateSummary(state_root, summary) => { @@ -890,8 +886,8 @@ impl, Cold: ItemStore> HotColdDB StoreOp::PutBlobs(_, _) => true, StoreOp::DeleteBlobs(block_root) => { match self.get_blobs(block_root) { - Ok(Some(blobs_sidecar)) => { - blobs_to_delete.push(blobs_sidecar); + Ok(Some(blobs_sidecar_list)) => { + blobs_to_delete.push((*block_root, blobs_sidecar_list)); } Err(e) => { error!( @@ -930,8 +926,8 @@ impl, Cold: ItemStore> HotColdDB for op in blob_cache_ops.iter_mut() { let reverse_op = match op { StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root), - StoreOp::DeleteBlobs(block_root) => match blobs_to_delete.pop() { - Some(blobs) => StoreOp::PutBlobs(*block_root, blobs), + StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() { + Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs), None => return Err(HotColdDBError::Rollback.into()), }, _ => return Err(HotColdDBError::Rollback.into()), @@ -977,7 +973,7 @@ impl, Cold: ItemStore> HotColdDB for op in blob_cache_ops { match op { StoreOp::PutBlobs(block_root, blobs) => { - guard_blob.put(block_root, blobs.clone()); + guard_blob.put(block_root, blobs); } StoreOp::DeleteBlobs(block_root) => { diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 47f0049fc2..179b099de3 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -160,6 +160,7 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), + // TODO (mark): space can be optimized here by de-duplicating data PutBlobs(Hash256, BlobSidecarList), PutOrphanedBlobsKey(Hash256), PutStateSummary(Hash256, HotStateSummary), diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index fec41b8bca..a57c2ca3d7 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -658,15 +658,13 @@ impl BeaconNodeHttpClient { Ok(path) } - /// Path for `lighthouse/beacon/blobs_sidecars/{block_id}` - pub fn get_blobs_sidecar_path(&self, block_id: BlockId) -> Result { - let mut path = self.server.full.clone(); - + /// Path for `v1/beacon/blobs/{block_id}` + pub fn get_blobs_path(&self, block_id: BlockId) -> Result { + let mut path = self.eth_path(V1)?; path.path_segments_mut() .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("lighthouse") .push("beacon") - .push("blobs_sidecars") + .push("blobs") .push(&block_id.to_string()); Ok(path) } @@ -698,14 +696,14 @@ impl BeaconNodeHttpClient { Ok(Some(response.json().await?)) } - /// `GET lighthouse/beacon/blobs_sidecars/{block_id}` + /// `GET v1/beacon/blobs/{block_id}` /// /// Returns `Ok(None)` on a 404 error. - pub async fn get_blobs_sidecar( + pub async fn get_blobs( &self, block_id: BlockId, ) -> Result>>, Error> { - let path = self.get_blobs_sidecar_path(block_id)?; + let path = self.get_blobs_path(block_id)?; let response = match self.get_response(path, |b| b).await.optional()? { Some(res) => res, None => return Ok(None), diff --git a/consensus/tree_hash/src/impls.rs b/consensus/tree_hash/src/impls.rs index 899356f833..134be40219 100644 --- a/consensus/tree_hash/src/impls.rs +++ b/consensus/tree_hash/src/impls.rs @@ -1,5 +1,6 @@ use super::*; use ethereum_types::{H160, H256, U128, U256}; +use std::sync::Arc; fn int_to_hash256(int: u64) -> Hash256 { let mut bytes = [0; HASHSIZE]; @@ -186,6 +187,24 @@ impl TreeHash for H256 { } } +impl TreeHash for Arc { + fn tree_hash_type() -> TreeHashType { + T::tree_hash_type() + } + + fn tree_hash_packed_encoding(&self) -> PackedEncoding { + self.as_ref().tree_hash_packed_encoding() + } + + fn tree_hash_packing_factor() -> usize { + T::tree_hash_packing_factor() + } + + fn tree_hash_root(&self) -> Hash256 { + self.as_ref().tree_hash_root() + } +} + #[cfg(test)] mod test { use super::*; diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index 4d23771daa..3bc6559a01 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -47,8 +47,6 @@ pub struct BlobSidecar { pub kzg_proof: KzgProof, } -pub type BlobSidecarList = VariableList, ::MaxBlobsPerBlock>; -//TODO(sean) is there any other way around this? need it arc blobs for caching in multiple places pub type BlobSidecarList = VariableList>, ::MaxBlobsPerBlock>; pub type Blobs = VariableList, ::MaxExtraDataBytes>; diff --git a/consensus/types/src/signed_blob.rs b/consensus/types/src/signed_blob.rs index 6b2279ce89..aaab02ca78 100644 --- a/consensus/types/src/signed_blob.rs +++ b/consensus/types/src/signed_blob.rs @@ -7,6 +7,7 @@ use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; use ssz_types::VariableList; +use std::sync::Arc; use test_random_derive::TestRandom; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; @@ -28,7 +29,7 @@ use tree_hash_derive::TreeHash; #[arbitrary(bound = "T: EthSpec")] #[derivative(Hash(bound = "T: EthSpec"))] pub struct SignedBlobSidecar { - pub message: BlobSidecar, + pub message: Arc>, pub signature: Signature, }