From 3c18e1a3a4c0a4895dcd08e79e34b6c59420edd9 Mon Sep 17 00:00:00 2001 From: Divma <26765164+divagant-martian@users.noreply.github.com> Date: Thu, 16 Mar 2023 19:20:39 -0500 Subject: [PATCH 1/2] thread blocks and blobs to sync (#4100) * thread blocks and blobs to sync * satisfy dead code analysis --- beacon_node/network/src/router/processor.rs | 4 +- beacon_node/network/src/sync/manager.rs | 126 ++++++++++-------- .../network/src/sync/network_context.rs | 24 ++-- 3 files changed, 83 insertions(+), 71 deletions(-) diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 56a5f24586..76962b373f 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -258,7 +258,7 @@ impl Processor { ); if let RequestId::Sync(id) = request_id { - self.send_to_sync(SyncMessage::RpcBlobs { + self.send_to_sync(SyncMessage::RpcBlob { peer_id, request_id: id, blob_sidecar, @@ -330,7 +330,7 @@ impl Processor { "Received BlobsByRoot Response"; "peer" => %peer_id, ); - self.send_to_sync(SyncMessage::RpcBlobs { + self.send_to_sync(SyncMessage::RpcBlob { request_id, peer_id, blob_sidecar, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 768b95273e..43921b585a 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,7 +35,7 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; -use super::network_context::{BlockOrBlobs, SyncNetworkContext}; +use super::network_context::{BlockOrBlob, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; @@ -86,6 +86,10 @@ pub enum RequestId { RangeBlobs { id: Id }, } +// TODO(diva) I'm updating functions what at a time, but this should be revisited because I think +// some code paths that are split for blobs and blocks can be made just one after sync as a whole +// is updated. + #[derive(Debug)] /// A message that can be sent to the sync manager thread. pub enum SyncMessage { @@ -101,7 +105,7 @@ pub enum SyncMessage { }, /// A blob has been received from the RPC. - RpcBlobs { + RpcBlob { request_id: RequestId, peer_id: PeerId, blob_sidecar: Option>>, @@ -554,7 +558,12 @@ impl SyncManager { beacon_block, seen_timestamp, } => { - self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp); + self.rpc_block_or_blob_received( + request_id, + peer_id, + beacon_block.into(), + seen_timestamp, + ); } SyncMessage::UnknownBlock(peer_id, block, block_root) => { // If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore @@ -638,12 +647,17 @@ impl SyncManager { .block_lookups .parent_chain_processed(chain_hash, result, &mut self.network), }, - SyncMessage::RpcBlobs { + SyncMessage::RpcBlob { request_id, peer_id, blob_sidecar, seen_timestamp, - } => self.rpc_blobs_received(request_id, peer_id, blob_sidecar, seen_timestamp), + } => self.rpc_block_or_blob_received( + request_id, + peer_id, + blob_sidecar.into(), + seen_timestamp, + ), } } @@ -702,30 +716,50 @@ impl SyncManager { } } - fn rpc_block_received( + fn rpc_block_or_blob_received( &mut self, request_id: RequestId, peer_id: PeerId, - beacon_block: Option>>, + block_or_blob: BlockOrBlob, seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( - id, - peer_id, - beacon_block.map(|block| block.into()), - seen_timestamp, - &mut self.network, - ), - RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( - id, - peer_id, - beacon_block.map(|block| block.into()), - seen_timestamp, - &mut self.network, - ), + RequestId::SingleBlock { id } => { + // TODO(diva) adjust when dealing with by root requests. This code is here to + // satisfy dead code analysis + match block_or_blob { + BlockOrBlob::Block(maybe_block) => { + self.block_lookups.single_block_lookup_response( + id, + peer_id, + maybe_block.map(BlockWrapper::Block), + seen_timestamp, + &mut self.network, + ) + } + BlockOrBlob::Sidecar(_) => unimplemented!("Mimatch between BlockWrapper and what the network receives needs to be handled first."), + } + } + RequestId::ParentLookup { id } => { + // TODO(diva) adjust when dealing with by root requests. This code is here to + // satisfy dead code analysis + match block_or_blob { + BlockOrBlob::Block(maybe_block) => self.block_lookups.parent_lookup_response( + id, + peer_id, + maybe_block.map(BlockWrapper::Block), + seen_timestamp, + &mut self.network, + ), + BlockOrBlob::Sidecar(_) => unimplemented!("Mimatch between BlockWrapper and what the network receives needs to be handled first."), + } + } RequestId::BackFillBlocks { id } => { - let is_stream_terminator = beacon_block.is_none(); + let maybe_block = match block_or_blob { + BlockOrBlob::Block(maybe_block) => maybe_block, + BlockOrBlob::Sidecar(_) => todo!("I think this is unreachable"), + }; + let is_stream_terminator = maybe_block.is_none(); if let Some(batch_id) = self .network .backfill_sync_only_blocks_response(id, is_stream_terminator) @@ -735,7 +769,7 @@ impl SyncManager { batch_id, &peer_id, id, - beacon_block.map(|block| block.into()), + maybe_block.map(|block| block.into()), ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), Ok(ProcessResult::Successful) => {} @@ -748,7 +782,11 @@ impl SyncManager { } } RequestId::RangeBlocks { id } => { - let is_stream_terminator = beacon_block.is_none(); + let maybe_block = match block_or_blob { + BlockOrBlob::Block(maybe_block) => maybe_block, + BlockOrBlob::Sidecar(_) => todo!("I think this should be unreachable, since this is a range only-blocks request, and the network should not accept this chunk at all. Needs better handling"), + }; + let is_stream_terminator = maybe_block.is_none(); if let Some((chain_id, batch_id)) = self .network .range_sync_block_response(id, is_stream_terminator) @@ -759,28 +797,28 @@ impl SyncManager { chain_id, batch_id, id, - beacon_block.map(|block| block.into()), + maybe_block.map(|block| block.into()), ); self.update_sync_state(); } } RequestId::BackFillBlobs { id } => { - self.blobs_backfill_response(id, peer_id, beacon_block.into()) + self.backfill_block_and_blobs_response(id, peer_id, block_or_blob) } RequestId::RangeBlobs { id } => { - self.blobs_range_response(id, peer_id, beacon_block.into()) + self.range_block_and_blobs_response(id, peer_id, block_or_blob) } } } /// Handles receiving a response for a range sync request that should have both blocks and /// blobs. - fn blobs_range_response( + fn range_block_and_blobs_response( &mut self, id: Id, peer_id: PeerId, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) { if let Some((chain_id, resp)) = self .network @@ -822,11 +860,11 @@ impl SyncManager { /// Handles receiving a response for a Backfill sync request that should have both blocks and /// blobs. - fn blobs_backfill_response( + fn backfill_block_and_blobs_response( &mut self, id: Id, peer_id: PeerId, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) { if let Some(resp) = self .network @@ -871,32 +909,6 @@ impl SyncManager { } } } - - fn rpc_blobs_received( - &mut self, - request_id: RequestId, - _peer_id: PeerId, - _maybe_blob: Option::EthSpec>>>, - _seen_timestamp: Duration, - ) { - match request_id { - RequestId::SingleBlock { .. } | RequestId::ParentLookup { .. } => { - unreachable!("There is no such thing as a singular 'by root' glob request that is not accompanied by the block") - } - RequestId::BackFillBlocks { .. } => { - unreachable!("An only blocks request does not receive sidecars") - } - RequestId::BackFillBlobs { .. } => { - unimplemented!("Adjust backfill sync"); - } - RequestId::RangeBlocks { .. } => { - unreachable!("Only-blocks range requests don't receive sidecars") - } - RequestId::RangeBlobs { id: _ } => { - unimplemented!("Adjust range"); - } - } - } } impl From>> for BlockProcessResult { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 10f7f32955..974d8dbd8c 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -75,20 +75,20 @@ pub struct SyncNetworkContext { } /// Small enumeration to make dealing with block and blob requests easier. -pub enum BlockOrBlobs { +pub enum BlockOrBlob { Block(Option>>), - Blobs(Option>>), + Sidecar(Option>>), } -impl From>>> for BlockOrBlobs { +impl From>>> for BlockOrBlob { fn from(block: Option>>) -> Self { - BlockOrBlobs::Block(block) + BlockOrBlob::Block(block) } } -impl From>>> for BlockOrBlobs { +impl From>>> for BlockOrBlob { fn from(blob: Option>>) -> Self { - BlockOrBlobs::Blobs(blob) + BlockOrBlob::Sidecar(blob) } } @@ -311,15 +311,15 @@ impl SyncNetworkContext { pub fn range_sync_block_and_blob_response( &mut self, request_id: Id, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) -> Option<(ChainId, BlocksAndBlobsByRangeResponse)> { match self.range_blocks_and_blobs_requests.entry(request_id) { Entry::Occupied(mut entry) => { let req = entry.get_mut(); let info = &mut req.block_blob_info; match block_or_blob { - BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), + BlockOrBlob::Sidecar(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything @@ -402,14 +402,14 @@ impl SyncNetworkContext { pub fn backfill_sync_block_and_blob_response( &mut self, request_id: Id, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) -> Option> { match self.backfill_blocks_and_blobs_requests.entry(request_id) { Entry::Occupied(mut entry) => { let (_, info) = entry.get_mut(); match block_or_blob { - BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), + BlockOrBlob::Sidecar(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything From 1301c62436d261ea646a86125584d227a618254c Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Sat, 18 Mar 2023 00:29:25 +1100 Subject: [PATCH 2/2] Validator blob signing for the unblinded flow (#4096) * Implement validator blob signing (full block and full blob) * Fix compilation error and remove redundant slot check * Fix clippy error --- common/eth2/src/types.rs | 20 +- consensus/types/src/chain_spec.rs | 14 +- consensus/types/src/config_and_preset.rs | 2 +- consensus/types/src/signed_blob.rs | 4 + validator_client/src/block_service.rs | 241 +++++++++++++---------- validator_client/src/signing_method.rs | 6 + validator_client/src/validator_store.rs | 45 ++++- 7 files changed, 218 insertions(+), 114 deletions(-) diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index db64d74c2a..bc27ddb474 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1326,7 +1326,7 @@ impl> Into> pub type BlockContentsTuple = ( SignedBeaconBlock, - Option, ::MaxBlobsPerBlock>>, + Option>, ); /// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobSidecars`]. @@ -1374,9 +1374,25 @@ impl> From> From> + for SignedBlockContents +{ + fn from(block_contents_tuple: BlockContentsTuple) -> Self { + match block_contents_tuple { + (signed_block, None) => SignedBlockContents::Block(signed_block), + (signed_block, Some(signed_blob_sidecars)) => { + SignedBlockContents::BlockAndBlobSidecars(SignedBeaconBlockAndBlobSidecars { + signed_block, + signed_blob_sidecars, + }) + } + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, Encode)] #[serde(bound = "T: EthSpec")] pub struct SignedBeaconBlockAndBlobSidecars> { pub signed_block: SignedBeaconBlock, - pub signed_blob_sidecars: VariableList, ::MaxBlobsPerBlock>, + pub signed_blob_sidecars: SignedBlobSidecarList, } diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 1f947c9e7b..c107f790c5 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -14,7 +14,7 @@ pub enum Domain { BlsToExecutionChange, BeaconProposer, BeaconAttester, - BlobsSideCar, + BlobSidecar, Randao, Deposit, VoluntaryExit, @@ -100,7 +100,7 @@ pub struct ChainSpec { */ pub(crate) domain_beacon_proposer: u32, pub(crate) domain_beacon_attester: u32, - pub(crate) domain_blobs_sidecar: u32, + pub(crate) domain_blob_sidecar: u32, pub(crate) domain_randao: u32, pub(crate) domain_deposit: u32, pub(crate) domain_voluntary_exit: u32, @@ -366,7 +366,7 @@ impl ChainSpec { match domain { Domain::BeaconProposer => self.domain_beacon_proposer, Domain::BeaconAttester => self.domain_beacon_attester, - Domain::BlobsSideCar => self.domain_blobs_sidecar, + Domain::BlobSidecar => self.domain_blob_sidecar, Domain::Randao => self.domain_randao, Domain::Deposit => self.domain_deposit, Domain::VoluntaryExit => self.domain_voluntary_exit, @@ -574,7 +574,7 @@ impl ChainSpec { domain_voluntary_exit: 4, domain_selection_proof: 5, domain_aggregate_and_proof: 6, - domain_blobs_sidecar: 10, // 0x0a000000 + domain_blob_sidecar: 11, // 0x0B000000 /* * Fork choice @@ -809,7 +809,7 @@ impl ChainSpec { domain_voluntary_exit: 4, domain_selection_proof: 5, domain_aggregate_and_proof: 6, - domain_blobs_sidecar: 10, + domain_blob_sidecar: 11, /* * Fork choice @@ -1285,7 +1285,7 @@ mod tests { test_domain(Domain::BeaconProposer, spec.domain_beacon_proposer, &spec); test_domain(Domain::BeaconAttester, spec.domain_beacon_attester, &spec); - test_domain(Domain::BlobsSideCar, spec.domain_blobs_sidecar, &spec); + test_domain(Domain::BlobSidecar, spec.domain_blob_sidecar, &spec); test_domain(Domain::Randao, spec.domain_randao, &spec); test_domain(Domain::Deposit, spec.domain_deposit, &spec); test_domain(Domain::VoluntaryExit, spec.domain_voluntary_exit, &spec); @@ -1311,7 +1311,7 @@ mod tests { &spec, ); - test_domain(Domain::BlobsSideCar, spec.domain_blobs_sidecar, &spec); + test_domain(Domain::BlobSidecar, spec.domain_blob_sidecar, &spec); } fn apply_bit_mask(domain_bytes: [u8; 4], spec: &ChainSpec) -> u32 { diff --git a/consensus/types/src/config_and_preset.rs b/consensus/types/src/config_and_preset.rs index ac93818b9c..957376c3d6 100644 --- a/consensus/types/src/config_and_preset.rs +++ b/consensus/types/src/config_and_preset.rs @@ -78,7 +78,7 @@ pub fn get_extra_fields(spec: &ChainSpec) -> HashMap { "bls_withdrawal_prefix".to_uppercase() => u8_hex(spec.bls_withdrawal_prefix_byte), "domain_beacon_proposer".to_uppercase() => u32_hex(spec.domain_beacon_proposer), "domain_beacon_attester".to_uppercase() => u32_hex(spec.domain_beacon_attester), - "domain_blobs_sidecar".to_uppercase() => u32_hex(spec.domain_blobs_sidecar), + "domain_blob_sidecar".to_uppercase() => u32_hex(spec.domain_blob_sidecar), "domain_randao".to_uppercase()=> u32_hex(spec.domain_randao), "domain_deposit".to_uppercase()=> u32_hex(spec.domain_deposit), "domain_voluntary_exit".to_uppercase() => u32_hex(spec.domain_voluntary_exit), diff --git a/consensus/types/src/signed_blob.rs b/consensus/types/src/signed_blob.rs index 4121b8b7f2..f9ae481247 100644 --- a/consensus/types/src/signed_blob.rs +++ b/consensus/types/src/signed_blob.rs @@ -2,6 +2,7 @@ use crate::{test_utils::TestRandom, BlobSidecar, EthSpec, Signature}; use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; +use ssz_types::VariableList; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; @@ -25,3 +26,6 @@ pub struct SignedBlobSidecar { pub message: BlobSidecar, pub signature: Signature, } + +pub type SignedBlobSidecarList = + VariableList, ::MaxBlobsPerBlock>; diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 0eb9a07c39..5fa32d3f42 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -6,9 +6,11 @@ use crate::{ OfflineOnFailure, }; use crate::{http_metrics::metrics, validator_store::ValidatorStore}; +use bls::SignatureBytes; use environment::RuntimeContext; -use eth2::types::SignedBlockContents; -use slog::{crit, debug, error, info, trace, warn}; +use eth2::types::{BlockContents, SignedBlockContents}; +use eth2::BeaconNodeHttpClient; +use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; @@ -16,8 +18,8 @@ use std::time::Duration; use tokio::sync::mpsc; use tokio::time::sleep; use types::{ - AbstractExecPayload, BeaconBlock, BlindedPayload, BlockType, EthSpec, FullPayload, Graffiti, - PublicKeyBytes, Slot, + AbstractExecPayload, BlindedPayload, BlockType, EthSpec, FullPayload, Graffiti, PublicKeyBytes, + Slot, }; #[derive(Debug)] @@ -342,80 +344,46 @@ impl BlockService { "slot" => slot.as_u64(), ); // Request block from first responsive beacon node. - let block = self + let block_contents = self .beacon_nodes .first_success( RequireSynced::No, OfflineOnFailure::Yes, - |beacon_node| async move { - let block: BeaconBlock = match Payload::block_type() { - BlockType::Full => { - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_GET], - ); - beacon_node - .get_validator_blocks::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })? - .data - .into() - } - BlockType::Blinded => { - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BLINDED_BEACON_BLOCK_HTTP_GET], - ); - beacon_node - .get_validator_blinded_blocks::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })? - .data - } - }; - - info!( + move |beacon_node| { + Self::get_validator_block( + beacon_node, + slot, + randao_reveal_ref, + graffiti, + proposer_index, log, - "Received unsigned block"; - "slot" => slot.as_u64(), - ); - if proposer_index != Some(block.proposer_index()) { - return Err(BlockError::Recoverable( - "Proposer index does not match block proposer. Beacon chain re-orged" - .to_string(), - )); - } - - Ok::<_, BlockError>(block) + ) }, ) .await?; + let (block, maybe_blob_sidecars) = block_contents.deconstruct(); let signing_timer = metrics::start_timer(&metrics::BLOCK_SIGNING_TIMES); - let signed_block_contents: SignedBlockContents = self_ref + + let signed_block = self_ref .validator_store .sign_block::(*validator_pubkey_ref, block, current_slot) .await - .map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))? - .into(); + .map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?; + + let maybe_signed_blobs = match maybe_blob_sidecars { + Some(blob_sidecars) => Some( + self_ref + .validator_store + .sign_blobs(*validator_pubkey_ref, blob_sidecars) + .await + .map_err(|e| { + BlockError::Recoverable(format!("Unable to sign blob: {:?}", e)) + })?, + ), + None => None, + }; + let signing_time_ms = Duration::from_secs_f64(signing_timer.map_or(0.0, |t| t.stop_and_record())).as_millis(); @@ -426,46 +394,19 @@ impl BlockService { "signing_time_ms" => signing_time_ms, ); + let signed_block_contents = SignedBlockContents::from((signed_block, maybe_signed_blobs)); + // Publish block with first available beacon node. self.beacon_nodes .first_success( RequireSynced::No, OfflineOnFailure::Yes, |beacon_node| async { - match Payload::block_type() { - BlockType::Full => { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_POST], - ); - beacon_node - .post_beacon_blocks(&signed_block_contents) - .await - .map_err(|e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing block: {:?}", - e - )) - })? - } - BlockType::Blinded => { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BLINDED_BEACON_BLOCK_HTTP_POST], - ); - beacon_node - // TODO: need to be adjusted for blobs - .post_beacon_blinded_blocks(signed_block_contents.signed_block()) - .await - .map_err(|e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing block: {:?}", - e - )) - })? - } - } - Ok::<_, BlockError>(()) + Self::publish_signed_block_contents::( + &signed_block_contents, + beacon_node, + ) + .await }, ) .await?; @@ -482,4 +423,106 @@ impl BlockService { Ok(()) } + + async fn publish_signed_block_contents>( + signed_block_contents: &SignedBlockContents, + beacon_node: &BeaconNodeHttpClient, + ) -> Result<(), BlockError> { + match Payload::block_type() { + BlockType::Full => { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_POST], + ); + beacon_node + .post_beacon_blocks(signed_block_contents) + .await + .map_err(|e| { + BlockError::Irrecoverable(format!( + "Error from beacon node when publishing block: {:?}", + e + )) + })? + } + BlockType::Blinded => { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BLINDED_BEACON_BLOCK_HTTP_POST], + ); + todo!("need to be adjusted for blobs"); + // beacon_node + // .post_beacon_blinded_blocks(signed_block_contents.signed_block()) + // .await + // .map_err(|e| { + // BlockError::Irrecoverable(format!( + // "Error from beacon node when publishing block: {:?}", + // e + // )) + // })? + } + } + Ok::<_, BlockError>(()) + } + + async fn get_validator_block>( + beacon_node: &BeaconNodeHttpClient, + slot: Slot, + randao_reveal_ref: &SignatureBytes, + graffiti: Option, + proposer_index: Option, + log: &Logger, + ) -> Result, BlockError> { + let block_contents: BlockContents = match Payload::block_type() { + BlockType::Full => { + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_GET], + ); + beacon_node + .get_validator_blocks::(slot, randao_reveal_ref, graffiti.as_ref()) + .await + .map_err(|e| { + BlockError::Recoverable(format!( + "Error from beacon node when producing block: {:?}", + e + )) + })? + .data + } + BlockType::Blinded => { + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BLINDED_BEACON_BLOCK_HTTP_GET], + ); + todo!("implement blinded flow for blobs"); + // beacon_node + // .get_validator_blinded_blocks::( + // slot, + // randao_reveal_ref, + // graffiti.as_ref(), + // ) + // .await + // .map_err(|e| { + // BlockError::Recoverable(format!( + // "Error from beacon node when producing block: {:?}", + // e + // )) + // })? + // .data + } + }; + + info!( + log, + "Received unsigned block"; + "slot" => slot.as_u64(), + ); + if proposer_index != Some(block_contents.block().proposer_index()) { + return Err(BlockError::Recoverable( + "Proposer index does not match block proposer. Beacon chain re-orged".to_string(), + )); + } + + Ok::<_, BlockError>(block_contents) + } } diff --git a/validator_client/src/signing_method.rs b/validator_client/src/signing_method.rs index ae9df08096..e428bffcff 100644 --- a/validator_client/src/signing_method.rs +++ b/validator_client/src/signing_method.rs @@ -37,6 +37,7 @@ pub enum Error { pub enum SignableMessage<'a, T: EthSpec, Payload: AbstractExecPayload = FullPayload> { RandaoReveal(Epoch), BeaconBlock(&'a BeaconBlock), + BlobSidecar(&'a BlobSidecar), AttestationData(&'a AttestationData), SignedAggregateAndProof(&'a AggregateAndProof), SelectionProof(Slot), @@ -58,6 +59,7 @@ impl<'a, T: EthSpec, Payload: AbstractExecPayload> SignableMessage<'a, T, Pay match self { SignableMessage::RandaoReveal(epoch) => epoch.signing_root(domain), SignableMessage::BeaconBlock(b) => b.signing_root(domain), + SignableMessage::BlobSidecar(b) => b.signing_root(domain), SignableMessage::AttestationData(a) => a.signing_root(domain), SignableMessage::SignedAggregateAndProof(a) => a.signing_root(domain), SignableMessage::SelectionProof(slot) => slot.signing_root(domain), @@ -180,6 +182,10 @@ impl SigningMethod { Web3SignerObject::RandaoReveal { epoch } } SignableMessage::BeaconBlock(block) => Web3SignerObject::beacon_block(block)?, + SignableMessage::BlobSidecar(_) => { + // https://github.com/ConsenSys/web3signer/issues/726 + unimplemented!("Web3Signer blob signing not implemented.") + } SignableMessage::AttestationData(a) => Web3SignerObject::Attestation(a), SignableMessage::SignedAggregateAndProof(a) => { Web3SignerObject::AggregateAndProof(a) diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 36a0d05734..294689e3c1 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -6,6 +6,7 @@ use crate::{ Config, }; use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString}; +use eth2::types::VariableList; use parking_lot::{Mutex, RwLock}; use slashing_protection::{ interchange::Interchange, InterchangeError, NotSafe, Safe, SlashingDatabase, @@ -19,11 +20,12 @@ use std::sync::Arc; use task_executor::TaskExecutor; use types::{ attestation::Error as AttestationError, graffiti::GraffitiString, AbstractExecPayload, Address, - AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, - Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes, SelectionProof, - Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedRoot, - SignedValidatorRegistrationData, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, - SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, BlobSidecarList, ChainSpec, + ContributionAndProof, Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes, + SelectionProof, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobSidecar, + SignedBlobSidecarList, SignedContributionAndProof, SignedRoot, SignedValidatorRegistrationData, + Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, SyncCommitteeMessage, + SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, }; use validator_dir::ValidatorDir; @@ -531,6 +533,39 @@ impl ValidatorStore { } } + pub async fn sign_blobs( + &self, + validator_pubkey: PublicKeyBytes, + blob_sidecars: BlobSidecarList, + ) -> Result, Error> { + let mut signed_blob_sidecars = Vec::new(); + + for blob_sidecar in blob_sidecars.into_iter() { + let slot = blob_sidecar.slot; + let signing_epoch = slot.epoch(E::slots_per_epoch()); + let signing_context = self.signing_context(Domain::BlobSidecar, signing_epoch); + let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + + let signature = signing_method + .get_signature::>( + SignableMessage::BlobSidecar(&blob_sidecar), + signing_context, + &self.spec, + &self.task_executor, + ) + .await?; + + metrics::inc_counter_vec(&metrics::SIGNED_BLOBS_TOTAL, &[metrics::SUCCESS]); + + signed_blob_sidecars.push(SignedBlobSidecar { + message: blob_sidecar, + signature, + }); + } + + Ok(VariableList::from(signed_blob_sidecars)) + } + pub async fn sign_attestation( &self, validator_pubkey: PublicKeyBytes,