diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 346e802cac..3249436736 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -44,11 +44,7 @@ pub async fn publish_block( beacon_block: block, blobs_sidecar: Arc::new(sidecar), }; - crate::publish_pubsub_message( - network_tx, - PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs.clone()), - )?; - block_and_blobs.into() + unimplemented!("Needs to be adjusted") } else { //FIXME(sean): This should probably return a specific no-blob-cached error code, beacon API coordination required return Err(warp_utils::reject::broadcast_without_import( diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index a6b0deb529..ab61f45be9 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -597,9 +597,9 @@ fn handle_v1_response( ) })?; match fork_name { - ForkName::Eip4844 => Ok(Some(RPCResponse::SidecarByRoot( + ForkName::Eip4844 => Ok(Some(RPCResponse::SidecarByRoot(Arc::new( BlobSidecar::from_ssz_bytes(decoded_buffer)?, - ))), + )))), _ => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, "Invalid fork name for block and blobs by root".to_string(), diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 9e386ae516..63a3fc6a19 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -280,7 +280,7 @@ pub enum RPCResponse { LightClientBootstrap(LightClientBootstrap), /// A response to a get BLOBS_BY_ROOT request. - SidecarByRoot(BlobSidecar), + SidecarByRoot(Arc>), /// A PONG response to a PING request. Pong(Ping), diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 4a1d125fa4..fc65959d1c 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -82,7 +82,7 @@ pub enum Response { /// A response to a LightClientUpdate request. LightClientBootstrap(LightClientBootstrap), /// A response to a get BLOBS_BY_ROOT request. - BlobsByRoot(Option>), + BlobsByRoot(Option>>), } impl std::convert::From> for RPCCodedResponse { diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 73535dc83c..410389b119 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -65,7 +65,7 @@ use task_executor::TaskExecutor; use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate, - ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, + ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; @@ -444,20 +444,22 @@ impl WorkEvent { } /// Create a new `Work` event for some blobs sidecar. - pub fn gossip_block_and_blobs_sidecar( + pub fn gossip_signed_blob_sidecar( message_id: MessageId, peer_id: PeerId, peer_client: Client, - block_and_blobs: SignedBeaconBlockAndBlobsSidecar, + blob_index: u64, + signed_blob: Arc>, seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: false, - work: Work::GossipBlockAndBlobsSidecar { + work: Work::GossipSignedBlobSidecar { message_id, peer_id, peer_client, - block_and_blobs, + blob_index, + signed_blob, seen_timestamp, }, } @@ -857,11 +859,12 @@ pub enum Work { block: Arc>, seen_timestamp: Duration, }, - GossipBlockAndBlobsSidecar { + GossipSignedBlobSidecar { message_id: MessageId, peer_id: PeerId, peer_client: Client, - block_and_blobs: SignedBeaconBlockAndBlobsSidecar, + blob_index: u64, + signed_blob: Arc>, seen_timestamp: Duration, }, DelayedImportBlock { @@ -965,7 +968,7 @@ impl Work { Work::GossipAggregate { .. } => GOSSIP_AGGREGATE, Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH, Work::GossipBlock { .. } => GOSSIP_BLOCK, - Work::GossipBlockAndBlobsSidecar { .. } => GOSSIP_BLOCK_AND_BLOBS_SIDECAR, + Work::GossipSignedBlobSidecar { .. } => GOSSIP_BLOCK_AND_BLOBS_SIDECAR, Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT, Work::GossipProposerSlashing { .. } => GOSSIP_PROPOSER_SLASHING, @@ -1459,7 +1462,7 @@ impl BeaconProcessor { Work::GossipBlock { .. } => { gossip_block_queue.push(work, work_id, &self.log) } - Work::GossipBlockAndBlobsSidecar { .. } => { + Work::GossipSignedBlobSidecar { .. } => { gossip_block_and_blobs_sidecar_queue.push(work, work_id, &self.log) } Work::DelayedImportBlock { .. } => { @@ -1742,21 +1745,21 @@ impl BeaconProcessor { /* * Verification for blobs sidecars received on gossip. */ - Work::GossipBlockAndBlobsSidecar { + Work::GossipSignedBlobSidecar { message_id, peer_id, peer_client, - block_and_blobs: block_sidecar_pair, + blob_index, + signed_blob, seen_timestamp, } => task_spawner.spawn_async(async move { worker - .process_gossip_block( + .process_gossip_blob( message_id, peer_id, peer_client, - block_sidecar_pair.into(), - work_reprocessing_tx, - duplicate_cache, + blob_index, + signed_blob, seen_timestamp, ) .await diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 5e84cbb5f2..0bdebd88fe 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -17,12 +17,13 @@ use operation_pool::ReceivedPreCapella; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use ssz::Encode; +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, LightClientFinalityUpdate, - LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, + LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; @@ -647,6 +648,27 @@ impl Worker { } } + // TODO: docs + #[allow(clippy::too_many_arguments)] + pub async fn process_gossip_blob( + self, + message_id: MessageId, + peer_id: PeerId, + peer_client: Client, + blob_index: u64, + signed_blob: Arc>, + seen_duration: Duration, + ) { + // TODO: gossip verification + crit!(self.log, "UNIMPLEMENTED gossip blob verification"; + "peer_id" => %peer_id, + "client" => %peer_client, + "blob_topic" => blob_index, + "blob_index" => signed_blob.blob.index, + "blob_slot" => signed_blob.blob.slot + ); + } + /// Process the beacon block received from the gossip network and: /// /// - If it passes gossip propagation criteria, tell the network thread to forward it. diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 85781de1f1..3fb8feaf48 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -14,6 +14,7 @@ use slog::{debug, error, warn}; use slot_clock::SlotClock; use std::sync::Arc; use task_executor::TaskExecutor; +use types::blob_sidecar::BlobIdentifier; use types::light_client_bootstrap::LightClientBootstrap; use types::{Epoch, EthSpec, Hash256, Slot}; @@ -219,24 +220,49 @@ impl Worker { request_id: PeerRequestId, request: BlobsByRootRequest, ) { + // TODO: this code is grossly adjusted to free the blobs. Needs love <3 // Fetching blocks is async because it may have to hit the execution layer for payloads. executor.spawn( async move { + let requested_blobs = request.blob_ids.len(); let mut send_block_count = 0; let mut send_response = true; - for root in request.blob_ids.iter() { + for BlobIdentifier{ block_root: root, index } in request.blob_ids.into_iter() { match self .chain - .get_block_and_blobs_checking_early_attester_cache(root) + .get_block_and_blobs_checking_early_attester_cache(&root) .await { Ok(Some(block_and_blobs)) => { - self.send_response( - peer_id, - Response::BlobsByRoot(Some(block_and_blobs)), - request_id, - ); - send_block_count += 1; + // + // TODO: HORRIBLE NSFW CODE AHEAD + // + let types::SignedBeaconBlockAndBlobsSidecar {beacon_block, blobs_sidecar} = block_and_blobs; + let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs_sidecar.as_ref().clone(); + // TODO: this should be unreachable after this is addressed seriously, + // so for now let's be ok with a panic in the expect. + let block = beacon_block.message_eip4844().expect("We fucked up the block blob stuff"); + // Intentionally not accessing the list directly + for (known_index, blob) in blob_bundle.into_iter().enumerate() { + if (known_index as u64) == index { + let blob_sidecar = types::BlobSidecar{ + block_root: beacon_block_root, + index, + slot: beacon_block_slot, + block_parent_root: block.parent_root, + proposer_index: block.proposer_index, + blob, + kzg_commitment: block.body.blob_kzg_commitments[known_index].clone(), // TODO: needs to be stored in a more logical way so that this won't panic. + kzg_proof: kzg_aggregated_proof // TODO: yeah + }; + self.send_response( + peer_id, + Response::BlobsByRoot(Some(Arc::new(blob_sidecar))), + request_id, + ); + send_block_count += 1; + } + } } Ok(None) => { debug!( @@ -250,7 +276,6 @@ impl Worker { error!( self.log, "No blobs in the store for block root"; - "request" => ?request, "peer" => %peer_id, "block_root" => ?root ); @@ -329,7 +354,7 @@ impl Worker { self.log, "Received BlobsByRoot Request"; "peer" => %peer_id, - "requested" => request.blob_ids.len(), + "requested" => requested_blobs, "returned" => send_block_count ); @@ -813,12 +838,28 @@ impl Worker { for root in block_roots { match self.chain.get_blobs(&root, data_availability_boundary) { Ok(Some(blobs)) => { - blobs_sent += 1; - self.send_network_message(NetworkMessage::SendResponse { - peer_id, - response: Response::BlobsByRange(Some(Arc::new(blobs))), - id: request_id, - }); + // TODO: more GROSS code ahead. Reader beware + let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs; + + for (blob_index, blob) in blob_bundle.into_iter().enumerate() { + let blob_sidecar = types::BlobSidecar{ + block_root: beacon_block_root, + index: blob_index as u64, + slot: beacon_block_slot, + block_parent_root: Hash256::zero(), + proposer_index: 0, + blob, + kzg_commitment: types::KzgCommitment::default(), + kzg_proof: types::KzgProof::default(), + }; + + blobs_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::BlobsByRange(Some(Arc::new(blob_sidecar))), + id: request_id, + }); + } } Ok(None) => { error!( diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 31f2092049..a18ce4e7c8 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -204,13 +204,13 @@ impl Router { self.processor .on_blocks_by_root_response(peer_id, request_id, beacon_block); } - Response::BlobsByRange(beacon_blob) => { + Response::BlobsByRange(blob) => { self.processor - .on_blobs_by_range_response(peer_id, request_id, beacon_blob); + .on_blobs_by_range_response(peer_id, request_id, blob); } - Response::BlobsByRoot(beacon_blob) => { + Response::BlobsByRoot(blob) => { self.processor - .on_blobs_by_root_response(peer_id, request_id, beacon_blob); + .on_blobs_by_root_response(peer_id, request_id, blob); } Response::LightClientBootstrap(_) => unreachable!(), } @@ -250,12 +250,14 @@ impl Router { block, ); } - PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs) => { - self.processor.on_block_and_blobs_sidecar_gossip( + PubsubMessage::BlobSidecar(data) => { + let (blob_index, signed_blob) = *data; + self.processor.on_blob_sidecar_gossip( id, peer_id, self.network_globals.client(&peer_id), - block_and_blobs, + blob_index, + Arc::new(signed_blob), ); } PubsubMessage::VoluntaryExit(exit) => { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index d0879babac..56a5f24586 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -18,10 +18,10 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::SyncCommitteeMessage; use tokio::sync::mpsc; use types::{ - Attestation, AttesterSlashing, BlobsSidecar, EthSpec, LightClientFinalityUpdate, + Attestation, AttesterSlashing, BlobSidecar, EthSpec, LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, - SignedBeaconBlockAndBlobsSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, - SignedVoluntaryExit, SubnetId, SyncSubnetId, + SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, + SubnetId, SyncSubnetId, }; /// Processes validated messages from the network. It relays necessary data to the syncing thread @@ -249,7 +249,7 @@ impl Processor { &mut self, peer_id: PeerId, request_id: RequestId, - blob_sidecar: Option>>, + blob_sidecar: Option>>, ) { trace!( self.log, @@ -310,7 +310,7 @@ impl Processor { &mut self, peer_id: PeerId, request_id: RequestId, - block_and_blobs: Option>, + blob_sidecar: Option>>, ) { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { @@ -322,18 +322,18 @@ impl Processor { unreachable!("Batch syncing does not request BBRoot requests") } }, - RequestId::Router => unreachable!("All BBRoot requests belong to sync"), + RequestId::Router => unreachable!("All BlobsByRoot requests belong to sync"), }; trace!( self.log, - "Received BlockAndBlobssByRoot Response"; + "Received BlobsByRoot Response"; "peer" => %peer_id, ); - self.send_to_sync(SyncMessage::RpcBlockAndBlobs { - peer_id, + self.send_to_sync(SyncMessage::RpcBlobs { request_id, - block_and_blobs, + peer_id, + blob_sidecar, seen_timestamp: timestamp_now(), }); } @@ -359,18 +359,20 @@ impl Processor { )) } - pub fn on_block_and_blobs_sidecar_gossip( + pub fn on_blob_sidecar_gossip( &mut self, message_id: MessageId, peer_id: PeerId, peer_client: Client, - block_and_blobs: SignedBeaconBlockAndBlobsSidecar, + blob_index: u64, // TODO: add a type for the blob index + signed_blob: Arc>, ) { - self.send_beacon_processor_work(BeaconWorkEvent::gossip_block_and_blobs_sidecar( + self.send_beacon_processor_work(BeaconWorkEvent::gossip_signed_blob_sidecar( message_id, peer_id, peer_client, - block_and_blobs, + blob_index, + signed_blob, timestamp_now(), )) } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index fa171cd04b..2c8e0f047f 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -56,9 +56,7 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::{ - BlobsSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot, -}; +use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -106,15 +104,7 @@ pub enum SyncMessage { RpcBlobs { request_id: RequestId, peer_id: PeerId, - blob_sidecar: Option>>, - seen_timestamp: Duration, - }, - - /// A block and blobs have been received from the RPC. - RpcBlockAndBlobs { - request_id: RequestId, - peer_id: PeerId, - block_and_blobs: Option>, + blob_sidecar: Option>>, seen_timestamp: Duration, }, @@ -654,17 +644,6 @@ impl SyncManager { blob_sidecar, seen_timestamp, } => self.rpc_blobs_received(request_id, peer_id, blob_sidecar, seen_timestamp), - SyncMessage::RpcBlockAndBlobs { - request_id, - peer_id, - block_and_blobs, - seen_timestamp, - } => self.rpc_block_block_and_blobs_received( - request_id, - peer_id, - block_and_blobs, - seen_timestamp, - ), } } @@ -897,7 +876,7 @@ impl SyncManager { &mut self, request_id: RequestId, peer_id: PeerId, - maybe_sidecar: Option::EthSpec>>>, + maybe_blob: Option::EthSpec>>>, _seen_timestamp: Duration, ) { match request_id { @@ -907,48 +886,17 @@ impl SyncManager { RequestId::BackFillBlocks { .. } => { unreachable!("An only blocks request does not receive sidecars") } - RequestId::BackFillBlobs { id } => { - self.blobs_backfill_response(id, peer_id, maybe_sidecar.into()) + RequestId::BackFillBlobs { .. } => { + unimplemented!("Adjust backfill sync"); } RequestId::RangeBlocks { .. } => { unreachable!("Only-blocks range requests don't receive sidecars") } RequestId::RangeBlobs { id } => { - self.blobs_range_response(id, peer_id, maybe_sidecar.into()) + unimplemented!("Adjust range"); } } } - - fn rpc_block_block_and_blobs_received( - &mut self, - request_id: RequestId, - peer_id: PeerId, - block_sidecar_pair: Option>, - seen_timestamp: Duration, - ) { - match request_id { - RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( - id, - peer_id, - block_sidecar_pair.map(|block_sidecar_pair| block_sidecar_pair.into()), - seen_timestamp, - &mut self.network, - ), - RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( - id, - peer_id, - block_sidecar_pair.map(|block_sidecar_pair| block_sidecar_pair.into()), - seen_timestamp, - &mut self.network, - ), - RequestId::BackFillBlocks { .. } - | RequestId::BackFillBlobs { .. } - | RequestId::RangeBlocks { .. } - | RequestId::RangeBlobs { .. } => unreachable!( - "since range requests are not block-glob coupled, this should never be reachable" - ), - } - } } impl From>> for BlockProcessResult { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index ef8f872cbf..2da6a41e24 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -426,7 +426,7 @@ impl SyncNetworkContext { "count" => request.block_roots.len(), "peer" => %peer_id ); - Request::BlobsByRoot(request.into()) + unimplemented!("There is no longer such thing as a single block lookup, since we nede to ask for blobs and blocks separetely"); } else { trace!( self.log, @@ -467,7 +467,9 @@ impl SyncNetworkContext { "count" => request.block_roots.len(), "peer" => %peer_id ); - Request::BlobsByRoot(request.into()) + unimplemented!( + "Parent requests now need to interleave blocks and blobs or something like that." + ) } else { trace!( self.log,