Merge branch 'deneb-free-blobs' of https://github.com/sigp/lighthouse into partial-processing

This commit is contained in:
realbigsean
2023-03-21 15:48:57 -04:00
10 changed files with 152 additions and 153 deletions

View File

@@ -983,12 +983,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(self.get_block(block_root).await?.map(Arc::new))
}
pub async fn get_block_and_blobs_checking_early_attester_cache(
pub async fn get_blobs_checking_early_attester_cache(
&self,
_block_root: &Hash256,
) -> Result<Option<()>, Error> {
//TODO(sean) use the rpc blobs cache and revert this to the current block cache logic
Ok(Some(()))
block_root: &Hash256,
) -> Result<Option<BlobSidecarList<T::EthSpec>>, Error> {
// If there is no data availability boundary, the Eip4844 fork is disabled.
if let Some(finalized_data_availability_boundary) =
self.finalized_data_availability_boundary()
{
self.early_attester_cache
.get_blobs(*block_root)
.map_or_else(
|| self.get_blobs(block_root, finalized_data_availability_boundary),
|blobs| Ok(Some(blobs)),
)
} else {
Ok(None)
}
}
/// Returns the block at the given root, if any.
@@ -3101,6 +3112,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// margin, or younger (of higher epoch number).
if block_epoch >= import_boundary {
if let Some(blobs) = blobs {
if !blobs.is_empty() {
//FIXME(sean) using this for debugging for now
info!(
self.log, "Writing blobs to store";
@@ -3110,6 +3122,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
}
}
let txn_lock = self.store.hot_db.begin_rw_transaction();
if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
@@ -4860,7 +4873,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
.map_err(BlockProductionError::KzgError)?;
let blob_sidecars = VariableList::from(
let blob_sidecars = BlobSidecarList::from(
blobs
.into_iter()
.enumerate()
@@ -4873,7 +4886,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get(blob_index)
.expect("KZG proof should exist for blob");
Ok(BlobSidecar {
Ok(Arc::new(BlobSidecar {
block_root: beacon_block_root,
index: blob_index as u64,
slot,
@@ -4882,9 +4895,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
blob,
kzg_commitment: *kzg_commitment,
kzg_proof: *kzg_proof,
}))
})
})
.collect::<Result<Vec<BlobSidecar<T::EthSpec>>, BlockProductionError>>()?,
.collect::<Result<Vec<_>, BlockProductionError>>()?,
);
self.proposal_blob_cache

View File

@@ -4,7 +4,7 @@ use eth2::types::BlockId as CoreBlockId;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use types::{BlobSidecar, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot};
use types::{BlobSidecarList, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot};
/// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given
/// `BlockId`.
@@ -212,19 +212,16 @@ impl BlockId {
}
}
/// Return the `BlobsSidecar` identified by `self`.
pub async fn blobs_sidecar<T: BeaconChainTypes>(
/// Return the `BlobSidecarList` identified by `self`.
pub async fn blob_sidecar_list<T: BeaconChainTypes>(
&self,
chain: &BeaconChain<T>,
) -> Result<Arc<BlobSidecar<T::EthSpec>>, warp::Rejection> {
) -> Result<BlobSidecarList<T::EthSpec>, warp::Rejection> {
let root = self.root(chain)?.0;
let Some(_data_availability_boundary) = chain.data_availability_boundary() else {
return Err(warp_utils::reject::custom_not_found("Eip4844 fork disabled".into()));
};
match chain.get_blobs(&root) {
Ok(Some(_blob)) => todo!(), // Jimmy's PR will fix this,
Ok(Some(blob_sidecar_list)) => Ok(blob_sidecar_list),
Ok(None) => Err(warp_utils::reject::custom_not_found(format!(
"Blob with block root {} is not in the store",
"No blobs with block root {} found in the store",
root
))),
Err(e) => Err(warp_utils::reject::beacon_chain_error(e)),

View File

@@ -1293,6 +1293,45 @@ pub fn serve<T: BeaconChainTypes>(
},
);
/*
* beacon/blobs
*/
// GET beacon/blobs/{block_id}
let get_blobs = eth_v1
.and(warp::path("beacon"))
.and(warp::path("blobs"))
.and(block_id_or_err)
.and(warp::path::end())
.and(chain_filter.clone())
.and(warp::header::optional::<api_types::Accept>("accept"))
.and_then(
|block_id: BlockId,
chain: Arc<BeaconChain<T>>,
accept_header: Option<api_types::Accept>| {
async move {
let blob_sidecar_list = block_id.blob_sidecar_list(&chain).await?;
match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
.header("Content-Type", "application/octet-stream")
.body(blob_sidecar_list.as_ssz_bytes().into())
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"failed to create response: {}",
e
))
}),
_ => Ok(warp::reply::json(&api_types::GenericResponse::from(
blob_sidecar_list,
))
.into_response()),
}
}
},
);
/*
* beacon/pool
*/
@@ -3498,41 +3537,6 @@ pub fn serve<T: BeaconChainTypes>(
)
});
// GET lighthouse/beacon/blobs_sidecars/{block_id}
let get_lighthouse_blobs_sidecars = warp::path("lighthouse")
.and(warp::path("beacon"))
.and(warp::path("blobs_sidecars"))
.and(block_id_or_err)
.and(warp::path::end())
.and(chain_filter.clone())
.and(warp::header::optional::<api_types::Accept>("accept"))
.and_then(
|block_id: BlockId,
chain: Arc<BeaconChain<T>>,
accept_header: Option<api_types::Accept>| {
async move {
let blobs_sidecar = block_id.blobs_sidecar(&chain).await?;
match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
.header("Content-Type", "application/octet-stream")
.body(blobs_sidecar.as_ssz_bytes().into())
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"failed to create response: {}",
e
))
}),
_ => Ok(warp::reply::json(&api_types::GenericResponse::from(
blobs_sidecar,
))
.into_response()),
}
}
},
);
let get_events = eth_v1
.and(warp::path("events"))
.and(warp::path::end())
@@ -3627,6 +3631,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(get_beacon_block_attestations)
.uor(get_beacon_blinded_block)
.uor(get_beacon_block_root)
.uor(get_blobs)
.uor(get_beacon_pool_attestations)
.uor(get_beacon_pool_attester_slashings)
.uor(get_beacon_pool_proposer_slashings)
@@ -3672,7 +3677,6 @@ pub fn serve<T: BeaconChainTypes>(
.uor(get_lighthouse_attestation_performance)
.uor(get_lighthouse_block_packing_efficiency)
.uor(get_lighthouse_merge_readiness)
.uor(get_lighthouse_blobs_sidecars.boxed())
.uor(get_events)
.recover(warp_utils::reject::handle_rejection),
)

View File

@@ -12,6 +12,7 @@ use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
use slog::{debug, error, warn};
use slot_clock::SlotClock;
use std::collections::{hash_map::Entry, HashMap};
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::blob_sidecar::BlobIdentifier;
@@ -225,45 +226,36 @@ impl<T: BeaconChainTypes> Worker<T> {
executor.spawn(
async move {
let requested_blobs = request.blob_ids.len();
let send_block_count = 0;
let mut send_blob_count = 0;
let mut send_response = true;
for BlobIdentifier{ block_root: root, index: _index } in request.blob_ids.into_iter() {
match self
let mut blob_list_results = HashMap::new();
for BlobIdentifier{ block_root: root, index } in request.blob_ids.into_iter() {
let blob_list_result = match blob_list_results.entry(root) {
Entry::Vacant(entry) => {
entry.insert(self
.chain
.get_block_and_blobs_checking_early_attester_cache(&root)
.await
{
Ok(Some(())) => {
todo!();
// //
// // TODO: HORRIBLE NSFW CODE AHEAD
// //
// let types::SignedBeaconBlockAndBlobsSidecar {beacon_block, blobs_sidecar} = block_and_blobs;
// let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs_sidecar.as_ref().clone();
// // TODO: this should be unreachable after this is addressed seriously,
// // so for now let's be ok with a panic in the expect.
// let block = beacon_block.message_eip4844().expect("We fucked up the block blob stuff");
// // Intentionally not accessing the list directly
// for (known_index, blob) in blob_bundle.into_iter().enumerate() {
// if (known_index as u64) == index {
// let blob_sidecar = types::BlobSidecar{
// block_root: beacon_block_root,
// index,
// slot: beacon_block_slot,
// block_parent_root: block.parent_root,
// proposer_index: block.proposer_index,
// blob,
// kzg_commitment: block.body.blob_kzg_commitments[known_index], // TODO: needs to be stored in a more logical way so that this won't panic.
// kzg_proof: kzg_aggregated_proof // TODO: yeah
// };
// self.send_response(
// peer_id,
// Response::BlobsByRoot(Some(Arc::new(blob_sidecar))),
// request_id,
// );
// send_block_count += 1;
// }
// }
.get_blobs_checking_early_attester_cache(&root)
.await)
}
Entry::Occupied(entry) => {
entry.into_mut()
}
};
match blob_list_result.as_ref() {
Ok(Some(blobs_sidecar_list)) => {
for blob_sidecar in blobs_sidecar_list.iter() {
if blob_sidecar.index == index {
self.send_response(
peer_id,
Response::BlobsByRoot(Some(blob_sidecar.clone())),
request_id,
);
send_blob_count += 1;
break;
}
}
}
Ok(None) => {
debug!(
@@ -356,7 +348,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"Received BlobsByRoot Request";
"peer" => %peer_id,
"requested" => requested_blobs,
"returned" => send_block_count
"returned" => send_blob_count
);
// send stream termination
@@ -837,36 +829,16 @@ impl<T: BeaconChainTypes> Worker<T> {
let mut send_response = true;
for root in block_roots {
match self.chain.get_blobs(&root) {
Ok(Some(_blobs)) => {
todo!();
// // TODO: more GROSS code ahead. Reader beware
// let types::BlobsSidecar {
// beacon_block_root,
// beacon_block_slot,
// blobs: blob_bundle,
// kzg_aggregated_proof: _,
// }: types::BlobsSidecar<_> = blobs;
//
// for (blob_index, blob) in blob_bundle.into_iter().enumerate() {
// let blob_sidecar = types::BlobSidecar {
// block_root: beacon_block_root,
// index: blob_index as u64,
// slot: beacon_block_slot,
// block_parent_root: Hash256::zero(),
// proposer_index: 0,
// blob,
// kzg_commitment: types::KzgCommitment::default(),
// kzg_proof: types::KzgProof::default(),
// };
//
// blobs_sent += 1;
// self.send_network_message(NetworkMessage::SendResponse {
// peer_id,
// response: Response::BlobsByRange(Some(Arc::new(blob_sidecar))),
// id: request_id,
// });
// }
match self.chain.get_blobs(&root, data_availability_boundary) {
Ok(Some(blob_sidecar_list)) => {
for blob_sidecar in blob_sidecar_list.iter() {
blobs_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlobsByRange(Some(blob_sidecar.clone())),
id: request_id,
});
}
}
Ok(None) => {
error!(

View File

@@ -569,11 +569,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes())
}
pub fn put_blobs(
&self,
block_root: &Hash256,
blobs: BlobSidecarList<E>,
) -> Result<(), Error> {
pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobSidecarList<E>) -> Result<(), Error> {
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
blobs_db.put_bytes(
DBColumn::BeaconBlob.into(),
@@ -587,7 +583,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn blobs_as_kv_store_ops(
&self,
key: &Hash256,
blobs: &BlobSidecarList<E>,
blobs: BlobSidecarList<E>,
ops: &mut Vec<KeyValueStoreOp>,
) {
let db_key = get_key_for_col(DBColumn::BeaconBlob.into(), key.as_bytes());
@@ -822,7 +818,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
StoreOp::PutBlobs(block_root, blobs) => {
self.blobs_as_kv_store_ops(&block_root, &blobs, &mut key_value_batch);
self.blobs_as_kv_store_ops(&block_root, blobs, &mut key_value_batch);
}
StoreOp::PutStateSummary(state_root, summary) => {
@@ -890,8 +886,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::PutBlobs(_, _) => true,
StoreOp::DeleteBlobs(block_root) => {
match self.get_blobs(block_root) {
Ok(Some(blobs_sidecar)) => {
blobs_to_delete.push(blobs_sidecar);
Ok(Some(blobs_sidecar_list)) => {
blobs_to_delete.push((*block_root, blobs_sidecar_list));
}
Err(e) => {
error!(
@@ -930,8 +926,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
for op in blob_cache_ops.iter_mut() {
let reverse_op = match op {
StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root),
StoreOp::DeleteBlobs(block_root) => match blobs_to_delete.pop() {
Some(blobs) => StoreOp::PutBlobs(*block_root, blobs),
StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() {
Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs),
None => return Err(HotColdDBError::Rollback.into()),
},
_ => return Err(HotColdDBError::Rollback.into()),
@@ -977,7 +973,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
for op in blob_cache_ops {
match op {
StoreOp::PutBlobs(block_root, blobs) => {
guard_blob.put(block_root, blobs.clone());
guard_blob.put(block_root, blobs);
}
StoreOp::DeleteBlobs(block_root) => {

View File

@@ -160,6 +160,7 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
pub enum StoreOp<'a, E: EthSpec> {
PutBlock(Hash256, Arc<SignedBeaconBlock<E>>),
PutState(Hash256, &'a BeaconState<E>),
// TODO (mark): space can be optimized here by de-duplicating data
PutBlobs(Hash256, BlobSidecarList<E>),
PutOrphanedBlobsKey(Hash256),
PutStateSummary(Hash256, HotStateSummary),

View File

@@ -658,15 +658,13 @@ impl BeaconNodeHttpClient {
Ok(path)
}
/// Path for `lighthouse/beacon/blobs_sidecars/{block_id}`
pub fn get_blobs_sidecar_path(&self, block_id: BlockId) -> Result<Url, Error> {
let mut path = self.server.full.clone();
/// Path for `v1/beacon/blobs/{block_id}`
pub fn get_blobs_path(&self, block_id: BlockId) -> Result<Url, Error> {
let mut path = self.eth_path(V1)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("lighthouse")
.push("beacon")
.push("blobs_sidecars")
.push("blobs")
.push(&block_id.to_string());
Ok(path)
}
@@ -698,14 +696,14 @@ impl BeaconNodeHttpClient {
Ok(Some(response.json().await?))
}
/// `GET lighthouse/beacon/blobs_sidecars/{block_id}`
/// `GET v1/beacon/blobs/{block_id}`
///
/// Returns `Ok(None)` on a 404 error.
pub async fn get_blobs_sidecar<T: EthSpec>(
pub async fn get_blobs<T: EthSpec>(
&self,
block_id: BlockId,
) -> Result<Option<GenericResponse<BlobSidecarList<T>>>, Error> {
let path = self.get_blobs_sidecar_path(block_id)?;
let path = self.get_blobs_path(block_id)?;
let response = match self.get_response(path, |b| b).await.optional()? {
Some(res) => res,
None => return Ok(None),

View File

@@ -1,5 +1,6 @@
use super::*;
use ethereum_types::{H160, H256, U128, U256};
use std::sync::Arc;
fn int_to_hash256(int: u64) -> Hash256 {
let mut bytes = [0; HASHSIZE];
@@ -186,6 +187,24 @@ impl TreeHash for H256 {
}
}
impl<T: TreeHash> TreeHash for Arc<T> {
fn tree_hash_type() -> TreeHashType {
T::tree_hash_type()
}
fn tree_hash_packed_encoding(&self) -> PackedEncoding {
self.as_ref().tree_hash_packed_encoding()
}
fn tree_hash_packing_factor() -> usize {
T::tree_hash_packing_factor()
}
fn tree_hash_root(&self) -> Hash256 {
self.as_ref().tree_hash_root()
}
}
#[cfg(test)]
mod test {
use super::*;

View File

@@ -47,8 +47,6 @@ pub struct BlobSidecar<T: EthSpec> {
pub kzg_proof: KzgProof,
}
pub type BlobSidecarList<T> = VariableList<BlobSidecar<T>, <T as EthSpec>::MaxBlobsPerBlock>;
//TODO(sean) is there any other way around this? need it arc blobs for caching in multiple places
pub type BlobSidecarList<T> =
VariableList<Arc<BlobSidecar<T>>, <T as EthSpec>::MaxBlobsPerBlock>;
pub type Blobs<T> = VariableList<Blob<T>, <T as EthSpec>::MaxExtraDataBytes>;

View File

@@ -7,6 +7,7 @@ use derivative::Derivative;
use serde_derive::{Deserialize, Serialize};
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
use std::sync::Arc;
use test_random_derive::TestRandom;
use tree_hash::TreeHash;
use tree_hash_derive::TreeHash;
@@ -28,7 +29,7 @@ use tree_hash_derive::TreeHash;
#[arbitrary(bound = "T: EthSpec")]
#[derivative(Hash(bound = "T: EthSpec"))]
pub struct SignedBlobSidecar<T: EthSpec> {
pub message: BlobSidecar<T>,
pub message: Arc<BlobSidecar<T>>,
pub signature: Signature,
}