From 4bfca8251dff0ceb622fea99bead74eeb1a2b3d8 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 15 Jul 2024 20:51:59 +0200 Subject: [PATCH 1/8] Add range sync metrics to track efficiency (#6095) * Add more range sync metrics to track efficiency * Add ignored blocks metrics --- beacon_node/network/src/metrics.rs | 30 +++++++++ .../network_beacon_processor/sync_methods.rs | 14 +++-- .../network/src/sync/backfill_sync/mod.rs | 10 +-- beacon_node/network/src/sync/manager.rs | 5 +- .../network/src/sync/range_sync/batch.rs | 28 ++++++--- .../network/src/sync/range_sync/chain.rs | 62 +++++++++++++++++-- .../src/sync/range_sync/chain_collection.rs | 23 +++++-- .../network/src/sync/range_sync/range.rs | 7 +++ 8 files changed, 151 insertions(+), 28 deletions(-) diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index dbcc8fb9b4..f0dba8d965 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -237,6 +237,36 @@ lazy_static! { "Number of Syncing chains in range, per range type", &["range_type"] ); + pub static ref SYNCING_CHAINS_REMOVED: Result = try_create_int_counter_vec( + "sync_range_removed_chains_total", + "Total count of range syncing chains removed per range type", + &["range_type"] + ); + pub static ref SYNCING_CHAINS_ADDED: Result = try_create_int_counter_vec( + "sync_range_added_chains_total", + "Total count of range syncing chains added per range type", + &["range_type"] + ); + pub static ref SYNCING_CHAINS_DROPPED_BLOCKS: Result = try_create_int_counter_vec( + "sync_range_chains_dropped_blocks_total", + "Total count of dropped blocks when removing a syncing chain per range type", + &["range_type"] + ); + pub static ref SYNCING_CHAINS_IGNORED_BLOCKS: Result = try_create_int_counter_vec( + "sync_range_chains_ignored_blocks_total", + "Total count of ignored blocks when processing a syncing chain batch per chain type", + &["chain_type"] + ); + pub static ref SYNCING_CHAINS_PROCESSED_BATCHES: Result = try_create_int_counter_vec( + "sync_range_chains_processed_batches_total", + "Total count of processed batches in a syncing chain batch per chain type", + &["chain_type"] + ); + pub static ref SYNCING_CHAIN_BATCH_AWAITING_PROCESSING: Result = try_create_histogram_with_buckets( + "sync_range_chain_batch_awaiting_processing_seconds", + "Time range sync batches spend in AwaitingProcessing state", + Ok(vec![0.01,0.02,0.05,0.1,0.2,0.5,1.0,2.0,5.0,10.0,20.0]) + ); pub static ref SYNC_SINGLE_BLOCK_LOOKUPS: Result = try_create_int_gauge( "sync_single_block_lookups", "Number of single block lookups underway" diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index acd02ab6ad..68bd674514 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -326,7 +326,7 @@ impl NetworkBeaconProcessor { .process_blocks(downloaded_blocks.iter(), notify_execution_layer) .await { - (_, Ok(_)) => { + (imported_blocks, Ok(_)) => { debug!(self.log, "Batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, @@ -335,7 +335,8 @@ impl NetworkBeaconProcessor { "processed_blocks" => sent_blocks, "service"=> "sync"); BatchProcessResult::Success { - was_non_empty: sent_blocks > 0, + sent_blocks, + imported_blocks, } } (imported_blocks, Err(e)) => { @@ -349,7 +350,7 @@ impl NetworkBeaconProcessor { "service" => "sync"); match e.peer_action { Some(penalty) => BatchProcessResult::FaultyFailure { - imported_blocks: imported_blocks > 0, + imported_blocks, penalty, }, None => BatchProcessResult::NonFaultyFailure, @@ -368,7 +369,7 @@ impl NetworkBeaconProcessor { .sum::(); match self.process_backfill_blocks(downloaded_blocks) { - (_, Ok(_)) => { + (imported_blocks, Ok(_)) => { debug!(self.log, "Backfill batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, @@ -377,7 +378,8 @@ impl NetworkBeaconProcessor { "processed_blobs" => n_blobs, "service"=> "sync"); BatchProcessResult::Success { - was_non_empty: sent_blocks > 0, + sent_blocks, + imported_blocks, } } (_, Err(e)) => { @@ -390,7 +392,7 @@ impl NetworkBeaconProcessor { "service" => "sync"); match e.peer_action { Some(penalty) => BatchProcessResult::FaultyFailure { - imported_blocks: false, + imported_blocks: 0, penalty, }, None => BatchProcessResult::NonFaultyFailure, diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 8bcc95fd3c..dfb05da19b 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -528,7 +528,7 @@ impl BackFillSync { // result callback. This is done, because an empty batch could end a chain and the logic // for removing chains and checking completion is in the callback. - let blocks = match batch.start_processing() { + let (blocks, _) = match batch.start_processing() { Err(e) => { return self .fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) @@ -615,13 +615,15 @@ impl BackFillSync { "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); match result { - BatchProcessResult::Success { was_non_empty } => { + BatchProcessResult::Success { + imported_blocks, .. + } => { if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; } // If the processed batch was not empty, we can validate previous unvalidated // blocks. - if *was_non_empty { + if *imported_blocks > 0 { self.advance_chain(network, batch_id); } @@ -677,7 +679,7 @@ impl BackFillSync { Ok(BatchOperationOutcome::Continue) => { // chain can continue. Check if it can be progressed - if *imported_blocks { + if *imported_blocks > 0 { // At least one block was successfully verified and imported, then we can be sure all // previous batches are valid and we only need to download the current failed // batch. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 23c05a6e16..dd4fa56d53 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -156,11 +156,12 @@ pub enum BlockProcessingResult { pub enum BatchProcessResult { /// The batch was completed successfully. It carries whether the sent batch contained blocks. Success { - was_non_empty: bool, + sent_blocks: usize, + imported_blocks: usize, }, /// The batch processing failed. It carries whether the processing imported any block. FaultyFailure { - imported_blocks: bool, + imported_blocks: usize, penalty: PeerAction, }, NonFaultyFailure, diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 6e377cc6cb..49e3ac3a81 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -5,6 +5,7 @@ use lighthouse_network::PeerId; use std::collections::HashSet; use std::hash::{Hash, Hasher}; use std::ops::Sub; +use std::time::{Duration, Instant}; use strum::Display; use types::{Epoch, EthSpec, Slot}; @@ -118,7 +119,7 @@ pub enum BatchState { /// The batch is being downloaded. Downloading(PeerId, Id), /// The batch has been completely downloaded and is ready for processing. - AwaitingProcessing(PeerId, Vec>), + AwaitingProcessing(PeerId, Vec>, Instant), /// The batch is being processed. Processing(Attempt), /// The batch was successfully processed and is waiting to be validated. @@ -210,13 +211,26 @@ impl BatchInfo { match &self.state { BatchState::AwaitingDownload | BatchState::Failed => None, BatchState::Downloading(peer_id, _) - | BatchState::AwaitingProcessing(peer_id, _) + | BatchState::AwaitingProcessing(peer_id, _, _) | BatchState::Processing(Attempt { peer_id, .. }) | BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id), BatchState::Poisoned => unreachable!("Poisoned batch"), } } + /// Returns the count of stored pending blocks if in awaiting processing state + pub fn pending_blocks(&self) -> usize { + match &self.state { + BatchState::AwaitingProcessing(_, blocks, _) => blocks.len(), + BatchState::AwaitingDownload + | BatchState::Downloading { .. } + | BatchState::Processing { .. } + | BatchState::AwaitingValidation { .. } + | BatchState::Poisoned + | BatchState::Failed => 0, + } + } + /// Returns a BlocksByRange request associated with the batch. pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) { ( @@ -293,7 +307,7 @@ impl BatchInfo { } let received = blocks.len(); - self.state = BatchState::AwaitingProcessing(peer, blocks); + self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now()); Ok(received) } BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -365,11 +379,11 @@ impl BatchInfo { } } - pub fn start_processing(&mut self) -> Result>, WrongState> { + pub fn start_processing(&mut self) -> Result<(Vec>, Duration), WrongState> { match self.state.poison() { - BatchState::AwaitingProcessing(peer, blocks) => { + BatchState::AwaitingProcessing(peer, blocks, start_instant) => { self.state = BatchState::Processing(Attempt::new::(peer, &blocks)); - Ok(blocks) + Ok((blocks, start_instant.elapsed())) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { @@ -515,7 +529,7 @@ impl std::fmt::Debug for BatchState { }) => write!(f, "AwaitingValidation({})", peer_id), BatchState::AwaitingDownload => f.write_str("AwaitingDownload"), BatchState::Failed => f.write_str("Failed"), - BatchState::AwaitingProcessing(ref peer, ref blocks) => { + BatchState::AwaitingProcessing(ref peer, ref blocks, _) => { write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) } BatchState::Downloading(peer, request_id) => { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index a735001fed..d63b2f95d8 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,4 +1,6 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; +use super::RangeSyncType; +use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::network_context::RangeRequestId; use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult}; @@ -11,6 +13,7 @@ use rand::{seq::SliceRandom, Rng}; use slog::{crit, debug, o, warn}; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use std::hash::{Hash, Hasher}; +use strum::IntoStaticStr; use types::{Epoch, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of @@ -53,6 +56,13 @@ pub struct KeepChain; pub type ChainId = u64; pub type BatchId = Epoch; +#[derive(Debug, Copy, Clone, IntoStaticStr)] +pub enum SyncingChainType { + Head, + Finalized, + Backfill, +} + /// A chain of blocks that need to be downloaded. Peers who claim to contain the target head /// root are grouped into the peer pool and queried for batches when downloading the /// chain. @@ -60,6 +70,9 @@ pub struct SyncingChain { /// A random id used to identify this chain. id: ChainId, + /// SyncingChain type + pub chain_type: SyncingChainType, + /// The start of the chain segment. Any epoch previous to this one has been validated. pub start_epoch: Epoch, @@ -126,6 +139,7 @@ impl SyncingChain { target_head_slot: Slot, target_head_root: Hash256, peer_id: PeerId, + chain_type: SyncingChainType, log: &slog::Logger, ) -> Self { let mut peers = FnvHashMap::default(); @@ -135,6 +149,7 @@ impl SyncingChain { SyncingChain { id, + chain_type, start_epoch, target_head_slot, target_head_root, @@ -171,6 +186,14 @@ impl SyncingChain { self.validated_batches * EPOCHS_PER_BATCH } + /// Returns the total count of pending blocks in all the batches of this chain + pub fn pending_blocks(&self) -> usize { + self.batches + .values() + .map(|batch| batch.pending_blocks()) + .sum() + } + /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. pub fn remove_peer( @@ -305,7 +328,12 @@ impl SyncingChain { // result callback. This is done, because an empty batch could end a chain and the logic // for removing chains and checking completion is in the callback. - let blocks = batch.start_processing()?; + let (blocks, duration_in_awaiting_processing) = batch.start_processing()?; + metrics::observe_duration( + &metrics::SYNCING_CHAIN_BATCH_AWAITING_PROCESSING, + duration_in_awaiting_processing, + ); + let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id); self.current_processing_batch = Some(batch_id); @@ -469,10 +497,27 @@ impl SyncingChain { // We consider three cases. Batch was successfully processed, Batch failed processing due // to a faulty peer, or batch failed processing but the peer can't be deemed faulty. match result { - BatchProcessResult::Success { was_non_empty } => { + BatchProcessResult::Success { + sent_blocks, + imported_blocks, + } => { + if sent_blocks > imported_blocks { + let ignored_blocks = sent_blocks - imported_blocks; + metrics::inc_counter_vec_by( + &metrics::SYNCING_CHAINS_IGNORED_BLOCKS, + &[self.chain_type.into()], + ignored_blocks as u64, + ); + } + metrics::inc_counter_vec( + &metrics::SYNCING_CHAINS_PROCESSED_BATCHES, + &[self.chain_type.into()], + ); + batch.processing_completed(BatchProcessingResult::Success)?; - if *was_non_empty { + // was not empty = sent_blocks > 0 + if *sent_blocks > 0 { // If the processed batch was not empty, we can validate previous unvalidated // blocks. self.advance_chain(network, batch_id); @@ -515,7 +560,7 @@ impl SyncingChain { match batch.processing_completed(BatchProcessingResult::FaultyFailure)? { BatchOperationOutcome::Continue => { // Chain can continue. Check if it can be moved forward. - if *imported_blocks { + if *imported_blocks > 0 { // At least one block was successfully verified and imported, so we can be sure all // previous batches are valid and we only need to download the current failed // batch. @@ -1142,3 +1187,12 @@ impl RemoveChain { ) } } + +impl From for SyncingChainType { + fn from(value: RangeSyncType) -> Self { + match value { + RangeSyncType::Head => Self::Head, + RangeSyncType::Finalized => Self::Finalized, + } + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 364514a358..3621a6605a 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -64,8 +64,8 @@ impl ChainCollection { /// Updates the Syncing state of the collection after a chain is removed. fn on_chain_removed(&mut self, id: &ChainId, was_syncing: bool, sync_type: RangeSyncType) { - let _ = metrics::get_int_gauge(&metrics::SYNCING_CHAINS_COUNT, &[sync_type.as_str()]) - .map(|m| m.dec()); + metrics::inc_counter_vec(&metrics::SYNCING_CHAINS_REMOVED, &[sync_type.as_str()]); + self.update_metrics(); match self.state { RangeSyncState::Finalized(ref syncing_id) => { @@ -493,15 +493,28 @@ impl ChainCollection { target_head_slot, target_head_root, peer, + sync_type.into(), &self.log, ); debug_assert_eq!(new_chain.get_id(), id); debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain); entry.insert(new_chain); - let _ = - metrics::get_int_gauge(&metrics::SYNCING_CHAINS_COUNT, &[sync_type.as_str()]) - .map(|m| m.inc()); + metrics::inc_counter_vec(&metrics::SYNCING_CHAINS_ADDED, &[sync_type.as_str()]); + self.update_metrics(); } } } + + fn update_metrics(&self) { + metrics::set_gauge_vec( + &metrics::SYNCING_CHAINS_COUNT, + &[RangeSyncType::Finalized.as_str()], + self.finalized_chains.len() as i64, + ); + metrics::set_gauge_vec( + &metrics::SYNCING_CHAINS_COUNT, + &[RangeSyncType::Head.as_str()], + self.head_chains.len() as i64, + ); + } } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index fa06af2495..4213771d48 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -43,6 +43,7 @@ use super::block_storage::BlockStorage; use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; use super::chain_collection::ChainCollection; use super::sync_type::RangeSyncType; +use crate::metrics; use crate::status::ToStatusMessage; use crate::sync::network_context::SyncNetworkContext; use crate::sync::BatchProcessResult; @@ -346,6 +347,12 @@ where } } + metrics::inc_counter_vec_by( + &metrics::SYNCING_CHAINS_DROPPED_BLOCKS, + &[sync_type.as_str()], + chain.pending_blocks() as u64, + ); + network.status_peers(self.beacon_chain.as_ref(), chain.peers()); let status = self.beacon_chain.status_message(); From 1d39aacfeb8543fe53233b82964fb7a6606a53ab Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 16 Jul 2024 04:52:02 +1000 Subject: [PATCH 2/8] Enable the outbound rate limiter by default, and update blobs method quotas (#6093) * Enable the outbound rate limiter by default, and update blobs method quotas. * Lint and book updates. --- .../lighthouse_network/src/rpc/config.rs | 9 ++++++-- beacon_node/src/cli.rs | 21 ++++++++++++------- beacon_node/src/config.rs | 19 ++++++++--------- book/src/help_bn.md | 8 ++----- 4 files changed, 31 insertions(+), 26 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index 08b81c7eae..d17fa112a1 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -103,8 +103,13 @@ impl RateLimiterConfig { pub const DEFAULT_GOODBYE_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota = Quota::n_every(1024, 10); pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); - pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(768, 10); - pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); + // `BlocksByRange` and `BlobsByRange` are sent together during range sync. + // It makes sense for blocks and blobs quotas to be equivalent in terms of the number of blocks: + // 1024 blocks * 6 max blobs per block. + // This doesn't necessarily mean that we are sending this many blobs, because the quotas are + // measured against the maximum request size. + pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(6144, 10); + pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(768, 10); pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_FINALITY_UPDATE_QUOTA: Quota = Quota::one_every(10); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 40a343a7fe..c32c5e7ec6 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -372,17 +372,22 @@ pub fn cli_app() -> Command { .arg( Arg::new("self-limiter") .long("self-limiter") - .help( - "Enables the outbound rate limiter (requests made by this node). \ - Use the self-limiter-protocol flag to set per protocol configurations. \ - If the self rate limiter is enabled and a protocol is not \ - present in the configuration, the quotas used for the inbound rate limiter will be \ - used." - ) + .help("This flag is deprecated and has no effect.") + .hide(true) .action(ArgAction::SetTrue) .help_heading(FLAG_HEADER) .display_order(0) ) + .arg( + Arg::new("disable-self-limiter") + .long("disable-self-limiter") + .help( + "Disables the outbound rate limiter (requests sent by this node)." + ) + .action(ArgAction::SetTrue) + .help_heading(FLAG_HEADER) + .display_order(0) + ) .arg( Arg::new("self-limiter-protocols") .long("self-limiter-protocols") @@ -397,7 +402,7 @@ pub fn cli_app() -> Command { ) .action(ArgAction::Append) .value_delimiter(';') - .requires("self-limiter") + .conflicts_with("disable-self-limiter") .display_order(0) ) .arg( diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index b3c91631c0..f0d02f6c51 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -1416,16 +1416,15 @@ pub fn set_network_config( // Light client server config. config.enable_light_client_server = parse_flag(cli_args, "light-client-server"); - // The self limiter is disabled by default. If the `self-limiter` flag is provided - // without the `self-limiter-protocols` flag, the default params will be used. - if parse_flag(cli_args, "self-limiter") { - config.outbound_rate_limiter_config = - if let Some(protocols) = cli_args.get_one::("self-limiter-protocols") { - Some(protocols.parse()?) - } else { - Some(Default::default()) - }; - } + // The self limiter is enabled by default. If the `self-limiter-protocols` flag is not provided, + // the default params will be used. + config.outbound_rate_limiter_config = if parse_flag(cli_args, "disable-self-limiter") { + None + } else if let Some(protocols) = cli_args.get_one::("self-limiter-protocols") { + Some(protocols.parse()?) + } else { + Some(Default::default()) + }; // Proposer-only mode overrides a number of previous configuration parameters. // Specifically, we avoid subscribing to long-lived subnets and wish to maintain a minimal set diff --git a/book/src/help_bn.md b/book/src/help_bn.md index 8bbbd3eb6b..50484f5ec4 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -505,6 +505,8 @@ Flags: --disable-quic Disables the quic transport. The node will rely solely on the TCP transport for libp2p connections. + --disable-self-limiter + Disables the outbound rate limiter (requests sent by this node). --disable-upnp Disables UPnP support. Setting this will prevent Lighthouse from attempting to automatically establish external port mappings. @@ -575,12 +577,6 @@ Flags: When present, Lighthouse will forget the payload statuses of any already-imported blocks. This can assist in the recovery from a consensus failure caused by the execution layer. - --self-limiter - Enables the outbound rate limiter (requests made by this node). Use - the self-limiter-protocol flag to set per protocol configurations. If - the self rate limiter is enabled and a protocol is not present in the - configuration, the quotas used for the inbound rate limiter will be - used. --shutdown-after-sync Shutdown beacon node as soon as sync is completed. Backfill sync will not be performed before shutdown. From 13ffdd211dc116a4d0a5ada3776b133fe62b3041 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 15 Jul 2024 12:49:08 -0700 Subject: [PATCH 3/8] Beacon api + validator electra (#5744) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Attestation superstruct changes for EIP 7549 (#5644) * update * experiment * superstruct changes * revert * superstruct changes * fix tests * indexed attestation * indexed attestation superstruct * updated TODOs * `superstruct` the `AttesterSlashing` (#5636) * `superstruct` Attester Fork Variants * Push a little further * Deal with Encode / Decode of AttesterSlashing * not so sure about this.. * Stop Encode/Decode Bounds from Propagating Out * Tons of Changes.. * More Conversions to AttestationRef * Add AsReference trait (#15) * Add AsReference trait * Fix some snafus * Got it Compiling! :D * Got Tests Building * Get beacon chain tests compiling --------- Co-authored-by: Michael Sproul * Merge remote-tracking branch 'upstream/unstable' into electra_attestation_changes * Make EF Tests Fork-Agnostic (#5713) * Finish EF Test Fork Agnostic (#5714) * Superstruct `AggregateAndProof` (#5715) * Upgrade `superstruct` to `0.8.0` * superstruct `AggregateAndProof` * Merge remote-tracking branch 'sigp/unstable' into electra_attestation_changes * cargo fmt * Merge pull request #5726 from realbigsean/electra_attestation_changes Merge unstable into Electra attestation changes * process withdrawals updates * cleanup withdrawals processing * update `process_operations` deposit length check * add apply_deposit changes * add execution layer withdrawal request processing * process deposit receipts * add consolidation processing * update process operations function * exit updates * clean up * update slash_validator * EIP7549 `get_attestation_indices` (#5657) * get attesting indices electra impl * fmt * get tests to pass * fmt * fix some beacon chain tests * fmt * fix slasher test * fmt got me again * fix more tests * fix tests * Some small changes (#5739) * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * cargo fmt (#5740) * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * fix attestation verification * Add new engine api methods * Fix the versioning of v4 requests * Handle new engine api methods in mock EL * Note todo * Fix todos * Add support for electra fields in getPayloadBodies * Add comments for potential versioning confusion * udpates for aggregate attestation endpoint * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * Sketch op pool changes * fix get attesting indices (#5742) * fix get attesting indices * better errors * fix compile * only get committee index once * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * Ef test fixes (#5753) * attestation related ef test fixes * delete commented out stuff * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * Merge branch 'block-processing-electra' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * Fix Aggregation Pool for Electra (#5754) * Fix Aggregation Pool for Electra * Remove Outdated Interface * fix ssz (#5755) * Get `electra_op_pool` up to date (#5756) * fix get attesting indices (#5742) * fix get attesting indices * better errors * fix compile * only get committee index once * Ef test fixes (#5753) * attestation related ef test fixes * delete commented out stuff * Fix Aggregation Pool for Electra (#5754) * Fix Aggregation Pool for Electra * Remove Outdated Interface * fix ssz (#5755) --------- Co-authored-by: realbigsean * Revert "Get `electra_op_pool` up to date (#5756)" (#5757) This reverts commit ab9e58aa3d0e6fe2175a4996a5de710e81152896. * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into electra_op_pool * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * Merge branch 'block-processing-electra' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * Compute on chain aggregate impl (#5752) * add compute_on_chain_agg impl to op pool changes * fmt * get op pool tests to pass * update beacon api aggregate attestationendpoint * update the naive agg pool interface (#5760) * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * Merge branch 'block-processing-electra' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * updates after merge * Fix bugs in cross-committee aggregation * Add comment to max cover optimisation * Fix assert * Electra epoch processing * add deposit limit for old deposit queue * Merge branch 'electra-epoch-proc' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * Merge pull request #5749 from sigp/electra_op_pool Optimise Electra op pool aggregation * don't fail on empty consolidations * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * Merge branch 'block-processing-electra' of https://github.com/sigp/lighthouse into electra-epoch-proc * Merge branch 'electra-epoch-proc' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * update committee offset * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * update committee offset * update committee offset * update committee offset * only increment the state deposit index on old deposit flow * Merge branch 'block-processing-electra' of https://github.com/sigp/lighthouse into electra-epoch-proc * Merge branch 'electra-epoch-proc' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * use correct max eb in epoch cache initialization * drop initiate validator ordering optimization * fix initiate exit for single pass * Merge branch 'electra-epoch-proc' of https://github.com/sigp/lighthouse into electra-engine-api * accept new payload v4 in mock el * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * Fix Electra Fork Choice Tests (#5764) * Fix Electra Fork Choice Tests (#5764) * Fix Electra Fork Choice Tests (#5764) * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * Merge branch 'block-processing-electra' of https://github.com/sigp/lighthouse into electra-epoch-proc * Merge branch 'electra-epoch-proc' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * Fix Consolidation Sigs & Withdrawals * Merge pull request #5766 from ethDreamer/two_fixes Fix Consolidation Sigs & Withdrawals * Merge branches 'block-processing-electra' and 'electra-epoch-proc' of https://github.com/sigp/lighthouse into electra-epoch-proc * Merge branch 'electra-epoch-proc' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * Send unagg attestation based on fork * Fix ser/de * Merge branch 'electra-engine-api' into beacon-api-electra * Subscribe to the correct subnets for electra attestations (#5782) * subscribe to the correct att subnets for electra * subscribe to the correct att subnets for electra * cargo fmt * Subscribe to the correct subnets for electra attestations (#5782) * subscribe to the correct att subnets for electra * subscribe to the correct att subnets for electra * cargo fmt * Subscribe to the correct subnets for electra attestations (#5782) * subscribe to the correct att subnets for electra * subscribe to the correct att subnets for electra * cargo fmt * Subscribe to the correct subnets for electra attestations (#5782) * subscribe to the correct att subnets for electra * subscribe to the correct att subnets for electra * cargo fmt * Subscribe to the correct subnets for electra attestations (#5782) * subscribe to the correct att subnets for electra * subscribe to the correct att subnets for electra * cargo fmt * update electra readiness with new endpoints * fix slashing handling * Fix Bug In Block Processing with 0x02 Credentials * Merge remote-tracking branch 'upstream/unstable' * Send unagg attestation based on fork * Publish all aggregates * just one more check bro plz.. * Merge pull request #5832 from ethDreamer/electra_attestation_changes_merge_unstable Merge `unstable` into `electra_attestation_changes` * Merge pull request #5835 from realbigsean/fix-validator-logic Fix validator logic * Merge pull request #5816 from realbigsean/electra-attestation-slashing-handling Electra slashing handling * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * Merge branch 'block-processing-electra' of https://github.com/sigp/lighthouse into electra-epoch-proc * Merge branch 'electra-epoch-proc' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * fix: serde rename camle case for execution payload body (#5846) * Merge branch 'electra-engine-api' into beacon-api-electra * Electra attestation changes rm decode impl (#5856) * Remove Crappy Decode impl for Attestation * Remove Inefficient Attestation Decode impl * Implement Schema Upgrade / Downgrade * Update beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs Co-authored-by: Michael Sproul --------- Co-authored-by: Michael Sproul * Fix failing attestation tests and misc electra attestation cleanup (#5810) * - get attestation related beacon chain tests to pass - observed attestations are now keyed off of data + committee index - rename op pool attestationref to compactattestationref - remove unwraps in agg pool and use options instead - cherry pick some changes from ef-tests-electra * cargo fmt * fix failing test * Revert dockerfile changes * make committee_index return option * function args shouldnt be a ref to attestation ref * fmt * fix dup imports --------- Co-authored-by: realbigsean * fix some todos (#5817) * Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes * add consolidations to merkle calc for inclusion proof * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * Merge branch 'block-processing-electra' of https://github.com/sigp/lighthouse into electra-epoch-proc * Merge branch 'electra-epoch-proc' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * Remove Duplicate KZG Commitment Merkle Proof Code (#5874) * Remove Duplicate KZG Commitment Merkle Proof Code * s/tree_lists/fields/ * Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * Merge branch 'block-processing-electra' of https://github.com/sigp/lighthouse into electra-epoch-proc * Merge branch 'electra-epoch-proc' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * fix compile * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * Merge branch 'block-processing-electra' of https://github.com/sigp/lighthouse into electra-epoch-proc * Merge branch 'electra-epoch-proc' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * Fix slasher tests (#5906) * Fix electra tests * Add electra attestations to double vote tests * Update superstruct to 0.8 * Merge remote-tracking branch 'origin/unstable' into electra_attestation_changes * Small cleanup in slasher tests * Clean up Electra observed aggregates (#5929) * Use consistent key in observed_attestations * Remove unwraps from observed aggregates * Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes * De-dup attestation constructor logic * Remove unwraps in Attestation construction * Dedup match_attestation_data * Remove outdated TODO * Use ForkName Ord in fork-choice tests * Use ForkName Ord in BeaconBlockBody * Make to_electra not fallible * Remove TestRandom impl for IndexedAttestation * Remove IndexedAttestation faulty Decode impl * Drop TestRandom impl * Add PendingAttestationInElectra * Indexed att on disk (#35) * indexed att on disk * fix lints * Update slasher/src/migrate.rs Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com> --------- Co-authored-by: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com> * add electra fork enabled fn to ForkName impl (#36) * add electra fork enabled fn to ForkName impl * remove inadvertent file * Update common/eth2/src/types.rs Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com> * Dedup attestation constructor logic in attester cache * Use if let Ok for committee_bits * Dedup Attestation constructor code * Diff reduction in tests * Fix beacon_chain tests * Diff reduction * Use Ord for ForkName in pubsub * Resolve into_attestation_and_indices todo * Remove stale TODO * Fix beacon_chain tests * Test spec invariant * Use electra_enabled in pubsub * Remove get_indexed_attestation_from_signed_aggregate * Use ok_or instead of if let else * committees are sorted * remove dup method `get_indexed_attestation_from_committees` * Merge pull request #5940 from dapplion/electra_attestation_changes_lionreview Electra attestations #5712 review * update default persisted op pool deserialization * ensure aggregate and proof uses serde untagged on ref * Fork aware ssz static attestation tests * Electra attestation changes from Lions review (#5971) * dedup/cleanup and remove unneeded hashset use * remove irrelevant TODOs * Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes * Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into block-processing-electra * Merge branch 'block-processing-electra' of https://github.com/sigp/lighthouse into electra-epoch-proc * Merge branch 'electra-epoch-proc' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * Fix Compilation Break * Merge pull request #5973 from ethDreamer/beacon-api-electra Fix Compilation Break * Electra attestation changes sean review (#5972) * instantiate empty bitlist in unreachable code * clean up error conversion * fork enabled bool cleanup * remove a couple todos * return bools instead of options in `aggregate` and use the result * delete commented out code * use map macros in simple transformations * remove signers_disjoint_from * get ef tests compiling * get ef tests compiling * update intentionally excluded files * Avoid changing slasher schema for Electra * Delete slasher schema v4 * Fix clippy * Fix compilation of beacon_chain tests * Update database.rs * Update per_block_processing.rs * Add electra lightclient types * Update slasher/src/database.rs * fix imports * Merge pull request #5980 from dapplion/electra-lightclient Add electra lightclient types * Merge pull request #5975 from michaelsproul/electra-slasher-no-migration Avoid changing slasher schema for Electra * Update beacon_node/beacon_chain/src/attestation_verification.rs * Update beacon_node/beacon_chain/src/attestation_verification.rs * Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes * Merge branch 'electra_attestation_changes' of https://github.com/realbigsean/lighthouse into block-processing-electra * Merge branch 'block-processing-electra' of https://github.com/sigp/lighthouse into electra-epoch-proc * Merge branch 'electra-epoch-proc' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * The great renaming receipt -> request * Address some more review comments * Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra-engine-api * Update beacon_node/beacon_chain/src/electra_readiness.rs * Update consensus/types/src/chain_spec.rs * update GET requests * update POST requests * add client updates and test updates * Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra-engine-api * Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra * compile after merge * unwrap -> unwrap_err * self review * fix tests * convert op pool messages to electra in electra * remove methods to post without content header * filter instead of convert --- beacon_node/beacon_chain/src/eth1_chain.rs | 12 +- beacon_node/http_api/src/lib.rs | 202 +++++++--- beacon_node/http_api/tests/fork_tests.rs | 13 +- .../http_api/tests/interactive_tests.rs | 3 +- beacon_node/http_api/tests/tests.rs | 362 +++++++++++++++--- common/eth2/src/lib.rs | 218 +++++++++-- common/eth2/src/types.rs | 2 + consensus/types/src/attestation.rs | 48 ++- consensus/types/src/attester_slashing.rs | 23 ++ validator_client/src/attestation_service.rs | 108 ++++-- 10 files changed, 817 insertions(+), 174 deletions(-) diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index ee50e3b384..62aad558ee 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -546,12 +546,20 @@ impl Eth1ChainBackend for CachingEth1Backend { state.eth1_data().deposit_count }; - match deposit_index.cmp(&deposit_count) { + // [New in Electra:EIP6110] + let deposit_index_limit = + if let Ok(deposit_receipts_start_index) = state.deposit_requests_start_index() { + std::cmp::min(deposit_count, deposit_receipts_start_index) + } else { + deposit_count + }; + + match deposit_index.cmp(&deposit_index_limit) { Ordering::Greater => Err(Error::DepositIndexTooHigh), Ordering::Equal => Ok(vec![]), Ordering::Less => { let next = deposit_index; - let last = std::cmp::min(deposit_count, next + E::MaxDeposits::to_u64()); + let last = std::cmp::min(deposit_index_limit, next + E::MaxDeposits::to_u64()); self.core .deposits() diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 7c8f64a722..2d017d6539 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -31,6 +31,7 @@ mod validators; mod version; use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3}; +use crate::version::fork_versioned_response; use beacon_chain::{ attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome, validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError, @@ -256,12 +257,15 @@ pub fn prometheus_metrics() -> warp::filters::log::Log( ); // GET beacon/blocks/{block_id}/attestations - let get_beacon_block_attestations = beacon_blocks_path_v1 + let get_beacon_block_attestations = beacon_blocks_path_any .clone() .and(warp::path("attestations")) .and(warp::path::end()) .then( - |block_id: BlockId, + |endpoint_version: EndpointVersion, + block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P1, move || { + task_spawner.blocking_response_task(Priority::P1, move || { let (block, execution_optimistic, finalized) = block_id.blinded_block(&chain)?; - Ok(api_types::GenericResponse::from( - block - .message() - .body() - .attestations() - .map(|att| att.clone_as_attestation()) - .collect::>(), - ) - .add_execution_optimistic_finalized(execution_optimistic, finalized)) + let fork_name = block + .fork_name(&chain.spec) + .map_err(inconsistent_fork_rejection)?; + let atts = block + .message() + .body() + .attestations() + .map(|att| att.clone_as_attestation()) + .collect::>(); + let res = execution_optimistic_finalized_fork_versioned_response( + endpoint_version, + fork_name, + execution_optimistic, + finalized, + &atts, + )?; + Ok(add_consensus_version_header( + warp::reply::json(&res).into_response(), + fork_name, + )) }) }, ); @@ -1750,8 +1766,14 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()); + let beacon_pool_path_any = any_version + .and(warp::path("beacon")) + .and(warp::path("pool")) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()); + // POST beacon/pool/attestations - let post_beacon_pool_attestations = beacon_pool_path + let post_beacon_pool_attestations = beacon_pool_path_any .clone() .and(warp::path("attestations")) .and(warp::path::end()) @@ -1760,7 +1782,11 @@ pub fn serve( .and(reprocess_send_filter) .and(log_filter.clone()) .then( - |task_spawner: TaskSpawner, + // V1 and V2 are identical except V2 has a consensus version header in the request. + // We only require this header for SSZ deserialization, which isn't supported for + // this endpoint presently. + |_endpoint_version: EndpointVersion, + task_spawner: TaskSpawner, chain: Arc>, attestations: Vec>, network_tx: UnboundedSender>, @@ -1781,16 +1807,17 @@ pub fn serve( ); // GET beacon/pool/attestations?committee_index,slot - let get_beacon_pool_attestations = beacon_pool_path + let get_beacon_pool_attestations = beacon_pool_path_any .clone() .and(warp::path("attestations")) .and(warp::path::end()) .and(warp::query::()) .then( - |task_spawner: TaskSpawner, + |endpoint_version: EndpointVersion, + task_spawner: TaskSpawner, chain: Arc>, query: api_types::AttestationPoolQuery| { - task_spawner.blocking_json_task(Priority::P1, move || { + task_spawner.blocking_response_task(Priority::P1, move || { let query_filter = |data: &AttestationData| { query.slot.map_or(true, |slot| slot == data.slot) && query @@ -1807,20 +1834,48 @@ pub fn serve( .filter(|&att| query_filter(att.data())) .cloned(), ); - Ok(api_types::GenericResponse::from(attestations)) + // Use the current slot to find the fork version, and convert all messages to the + // current fork's format. This is to ensure consistent message types matching + // `Eth-Consensus-Version`. + let current_slot = + chain + .slot_clock + .now() + .ok_or(warp_utils::reject::custom_server_error( + "unable to read slot clock".to_string(), + ))?; + let fork_name = chain.spec.fork_name_at_slot::(current_slot); + let attestations = attestations + .into_iter() + .filter(|att| { + (fork_name.electra_enabled() && matches!(att, Attestation::Electra(_))) + || (!fork_name.electra_enabled() + && matches!(att, Attestation::Base(_))) + }) + .collect::>(); + + let res = fork_versioned_response(endpoint_version, fork_name, &attestations)?; + Ok(add_consensus_version_header( + warp::reply::json(&res).into_response(), + fork_name, + )) }) }, ); // POST beacon/pool/attester_slashings - let post_beacon_pool_attester_slashings = beacon_pool_path + let post_beacon_pool_attester_slashings = beacon_pool_path_any .clone() .and(warp::path("attester_slashings")) .and(warp::path::end()) .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .then( - |task_spawner: TaskSpawner, + // V1 and V2 are identical except V2 has a consensus version header in the request. + // We only require this header for SSZ deserialization, which isn't supported for + // this endpoint presently. + |_endpoint_version: EndpointVersion, + task_spawner: TaskSpawner, chain: Arc>, slashing: AttesterSlashing, network_tx: UnboundedSender>| { @@ -1857,18 +1912,45 @@ pub fn serve( ); // GET beacon/pool/attester_slashings - let get_beacon_pool_attester_slashings = beacon_pool_path - .clone() - .and(warp::path("attester_slashings")) - .and(warp::path::end()) - .then( - |task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P1, move || { - let attestations = chain.op_pool.get_all_attester_slashings(); - Ok(api_types::GenericResponse::from(attestations)) - }) - }, - ); + let get_beacon_pool_attester_slashings = + beacon_pool_path_any + .clone() + .and(warp::path("attester_slashings")) + .and(warp::path::end()) + .then( + |endpoint_version: EndpointVersion, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_response_task(Priority::P1, move || { + let slashings = chain.op_pool.get_all_attester_slashings(); + + // Use the current slot to find the fork version, and convert all messages to the + // current fork's format. This is to ensure consistent message types matching + // `Eth-Consensus-Version`. + let current_slot = chain.slot_clock.now().ok_or( + warp_utils::reject::custom_server_error( + "unable to read slot clock".to_string(), + ), + )?; + let fork_name = chain.spec.fork_name_at_slot::(current_slot); + let slashings = slashings + .into_iter() + .filter(|slashing| { + (fork_name.electra_enabled() + && matches!(slashing, AttesterSlashing::Electra(_))) + || (!fork_name.electra_enabled() + && matches!(slashing, AttesterSlashing::Base(_))) + }) + .collect::>(); + + let res = fork_versioned_response(endpoint_version, fork_name, &slashings)?; + Ok(add_consensus_version_header( + warp::reply::json(&res).into_response(), + fork_name, + )) + }) + }, + ); // POST beacon/pool/proposer_slashings let post_beacon_pool_proposer_slashings = beacon_pool_path @@ -3175,7 +3257,7 @@ pub fn serve( ); // GET validator/aggregate_attestation?attestation_data_root,slot - let get_validator_aggregate_attestation = eth_v1 + let get_validator_aggregate_attestation = any_version .and(warp::path("validator")) .and(warp::path("aggregate_attestation")) .and(warp::path::end()) @@ -3184,29 +3266,45 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( - |query: api_types::ValidatorAggregateAttestationQuery, + |endpoint_version: EndpointVersion, + query: api_types::ValidatorAggregateAttestationQuery, not_synced_filter: Result<(), Rejection>, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P0, move || { not_synced_filter?; - chain - .get_pre_electra_aggregated_attestation_by_slot_and_root( + let res = if endpoint_version == V2 { + let Some(committee_index) = query.committee_index else { + return Err(warp_utils::reject::custom_bad_request( + "missing committee index".to_string(), + )); + }; + chain.get_aggregated_attestation_electra( + query.slot, + &query.attestation_data_root, + committee_index, + ) + } else if endpoint_version == V1 { + // Do nothing + chain.get_pre_electra_aggregated_attestation_by_slot_and_root( query.slot, &query.attestation_data_root, ) - .map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "unable to fetch aggregate: {:?}", - e - )) - })? - .map(api_types::GenericResponse::from) - .ok_or_else(|| { - warp_utils::reject::custom_not_found( - "no matching aggregate found".to_string(), - ) - }) + } else { + return Err(unsupported_version_rejection(endpoint_version)); + }; + res.map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "unable to fetch aggregate: {:?}", + e + )) + })? + .map(api_types::GenericResponse::from) + .ok_or_else(|| { + warp_utils::reject::custom_not_found( + "no matching aggregate found".to_string(), + ) + }) }) }, ); @@ -3302,7 +3400,7 @@ pub fn serve( ); // POST validator/aggregate_and_proofs - let post_validator_aggregate_and_proofs = eth_v1 + let post_validator_aggregate_and_proofs = any_version .and(warp::path("validator")) .and(warp::path("aggregate_and_proofs")) .and(warp::path::end()) @@ -3313,7 +3411,11 @@ pub fn serve( .and(network_tx_filter.clone()) .and(log_filter.clone()) .then( - |not_synced_filter: Result<(), Rejection>, + // V1 and V2 are identical except V2 has a consensus version header in the request. + // We only require this header for SSZ deserialization, which isn't supported for + // this endpoint presently. + |_endpoint_version: EndpointVersion, + not_synced_filter: Result<(), Rejection>, task_spawner: TaskSpawner, chain: Arc>, aggregates: Vec>, diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index 1e20280da1..db8a0ab2b5 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -150,8 +150,13 @@ async fn attestations_across_fork_with_skip_slots() { .collect::>(); assert!(!unaggregated_attestations.is_empty()); + let fork_name = harness.spec.fork_name_at_slot::(fork_slot); client - .post_beacon_pool_attestations(&unaggregated_attestations) + .post_beacon_pool_attestations_v1(&unaggregated_attestations) + .await + .unwrap(); + client + .post_beacon_pool_attestations_v2(&unaggregated_attestations, fork_name) .await .unwrap(); @@ -162,7 +167,11 @@ async fn attestations_across_fork_with_skip_slots() { assert!(!signed_aggregates.is_empty()); client - .post_validator_aggregate_and_proof(&signed_aggregates) + .post_validator_aggregate_and_proof_v1(&signed_aggregates) + .await + .unwrap(); + client + .post_validator_aggregate_and_proof_v2(&signed_aggregates, fork_name) .await .unwrap(); } diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 14673d23e1..2f417cf7ba 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -893,9 +893,10 @@ async fn queue_attestations_from_http() { .flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att)) .collect::>(); + let fork_name = tester.harness.spec.fork_name_at_slot::(attestation_slot); let attestation_future = tokio::spawn(async move { client - .post_beacon_pool_attestations(&attestations) + .post_beacon_pool_attestations_v2(&attestations, fork_name) .await .expect("attestations should be processed successfully") }); diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index cb4ce34682..f511f25d32 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1668,7 +1668,7 @@ impl ApiTester { for block_id in self.interesting_block_ids() { let result = self .client - .get_beacon_blocks_attestations(block_id.0) + .get_beacon_blocks_attestations_v2(block_id.0) .await .unwrap() .map(|res| res.data); @@ -1699,9 +1699,9 @@ impl ApiTester { self } - pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self { + pub async fn test_post_beacon_pool_attestations_valid_v1(mut self) -> Self { self.client - .post_beacon_pool_attestations(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); @@ -1713,7 +1713,25 @@ impl ApiTester { self } - pub async fn test_post_beacon_pool_attestations_invalid(mut self) -> Self { + pub async fn test_post_beacon_pool_attestations_valid_v2(mut self) -> Self { + let fork_name = self + .attestations + .first() + .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) + .unwrap(); + self.client + .post_beacon_pool_attestations_v2(self.attestations.as_slice(), fork_name) + .await + .unwrap(); + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid attestation should be sent to network" + ); + + self + } + + pub async fn test_post_beacon_pool_attestations_invalid_v1(mut self) -> Self { let mut attestations = Vec::new(); for attestation in &self.attestations { let mut invalid_attestation = attestation.clone(); @@ -1726,7 +1744,7 @@ impl ApiTester { let err = self .client - .post_beacon_pool_attestations(attestations.as_slice()) + .post_beacon_pool_attestations_v1(attestations.as_slice()) .await .unwrap_err(); @@ -1749,6 +1767,48 @@ impl ApiTester { self } + pub async fn test_post_beacon_pool_attestations_invalid_v2(mut self) -> Self { + let mut attestations = Vec::new(); + for attestation in &self.attestations { + let mut invalid_attestation = attestation.clone(); + invalid_attestation.data_mut().slot += 1; + + // add both to ensure we only fail on invalid attestations + attestations.push(attestation.clone()); + attestations.push(invalid_attestation); + } + + let fork_name = self + .attestations + .first() + .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) + .unwrap(); + + let err_v2 = self + .client + .post_beacon_pool_attestations_v2(attestations.as_slice(), fork_name) + .await + .unwrap_err(); + + match err_v2 { + Error::ServerIndexedMessage(IndexedErrorMessage { + code, + message: _, + failures, + }) => { + assert_eq!(code, 400); + assert_eq!(failures.len(), self.attestations.len()); + } + _ => panic!("query did not fail correctly"), + } + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "if some attestations are valid, we should send them to the network" + ); + + self + } pub async fn test_get_beacon_light_client_bootstrap(self) -> Self { let block_id = BlockId(CoreBlockId::Finalized); @@ -1812,7 +1872,7 @@ impl ApiTester { pub async fn test_get_beacon_pool_attestations(self) -> Self { let result = self .client - .get_beacon_pool_attestations(None, None) + .get_beacon_pool_attestations_v1(None, None) .await .unwrap() .data; @@ -1822,12 +1882,20 @@ impl ApiTester { assert_eq!(result, expected); + let result = self + .client + .get_beacon_pool_attestations_v2(None, None) + .await + .unwrap() + .data; + assert_eq!(result, expected); + self } - pub async fn test_post_beacon_pool_attester_slashings_valid(mut self) -> Self { + pub async fn test_post_beacon_pool_attester_slashings_valid_v1(mut self) -> Self { self.client - .post_beacon_pool_attester_slashings(&self.attester_slashing) + .post_beacon_pool_attester_slashings_v1(&self.attester_slashing) .await .unwrap(); @@ -1839,7 +1907,25 @@ impl ApiTester { self } - pub async fn test_post_beacon_pool_attester_slashings_invalid(mut self) -> Self { + pub async fn test_post_beacon_pool_attester_slashings_valid_v2(mut self) -> Self { + let fork_name = self + .chain + .spec + .fork_name_at_slot::(self.attester_slashing.attestation_1().data().slot); + self.client + .post_beacon_pool_attester_slashings_v2(&self.attester_slashing, fork_name) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid attester slashing should be sent to network" + ); + + self + } + + pub async fn test_post_beacon_pool_attester_slashings_invalid_v1(mut self) -> Self { let mut slashing = self.attester_slashing.clone(); match &mut slashing { AttesterSlashing::Base(ref mut slashing) => { @@ -1851,7 +1937,35 @@ impl ApiTester { } self.client - .post_beacon_pool_attester_slashings(&slashing) + .post_beacon_pool_attester_slashings_v1(&slashing) + .await + .unwrap_err(); + + assert!( + self.network_rx.network_recv.recv().now_or_never().is_none(), + "invalid attester slashing should not be sent to network" + ); + + self + } + + pub async fn test_post_beacon_pool_attester_slashings_invalid_v2(mut self) -> Self { + let mut slashing = self.attester_slashing.clone(); + match &mut slashing { + AttesterSlashing::Base(ref mut slashing) => { + slashing.attestation_1.data.slot += 1; + } + AttesterSlashing::Electra(ref mut slashing) => { + slashing.attestation_1.data.slot += 1; + } + } + + let fork_name = self + .chain + .spec + .fork_name_at_slot::(self.attester_slashing.attestation_1().data().slot); + self.client + .post_beacon_pool_attester_slashings_v2(&slashing, fork_name) .await .unwrap_err(); @@ -1866,7 +1980,7 @@ impl ApiTester { pub async fn test_get_beacon_pool_attester_slashings(self) -> Self { let result = self .client - .get_beacon_pool_attester_slashings() + .get_beacon_pool_attester_slashings_v1() .await .unwrap() .data; @@ -1875,6 +1989,14 @@ impl ApiTester { assert_eq!(result, expected); + let result = self + .client + .get_beacon_pool_attester_slashings_v2() + .await + .unwrap() + .data; + assert_eq!(result, expected); + self } @@ -3233,30 +3355,52 @@ impl ApiTester { } pub async fn test_get_validator_aggregate_attestation(self) -> Self { - let attestation = self + if self .chain - .head_beacon_block() - .message() - .body() - .attestations() - .next() - .unwrap() - .clone_as_attestation(); + .spec + .fork_name_at_slot::(self.chain.slot().unwrap()) + .electra_enabled() + { + for attestation in self.chain.naive_aggregation_pool.read().iter() { + let result = self + .client + .get_validator_aggregate_attestation_v2( + attestation.data().slot, + attestation.data().tree_hash_root(), + attestation.committee_index().unwrap(), + ) + .await + .unwrap() + .unwrap() + .data; + let expected = attestation; - let result = self - .client - .get_validator_aggregate_attestation( - attestation.data().slot, - attestation.data().tree_hash_root(), - ) - .await - .unwrap() - .unwrap() - .data; + assert_eq!(&result, expected); + } + } else { + let attestation = self + .chain + .head_beacon_block() + .message() + .body() + .attestations() + .next() + .unwrap() + .clone_as_attestation(); + let result = self + .client + .get_validator_aggregate_attestation_v1( + attestation.data().slot, + attestation.data().tree_hash_root(), + ) + .await + .unwrap() + .unwrap() + .data; + let expected = attestation; - let expected = attestation; - - assert_eq!(result, expected); + assert_eq!(result, expected); + } self } @@ -3355,11 +3499,11 @@ impl ApiTester { ) } - pub async fn test_get_validator_aggregate_and_proofs_valid(mut self) -> Self { + pub async fn test_get_validator_aggregate_and_proofs_valid_v1(mut self) -> Self { let aggregate = self.get_aggregate().await; self.client - .post_validator_aggregate_and_proof::(&[aggregate]) + .post_validator_aggregate_and_proof_v1::(&[aggregate]) .await .unwrap(); @@ -3368,7 +3512,7 @@ impl ApiTester { self } - pub async fn test_get_validator_aggregate_and_proofs_invalid(mut self) -> Self { + pub async fn test_get_validator_aggregate_and_proofs_invalid_v1(mut self) -> Self { let mut aggregate = self.get_aggregate().await; match &mut aggregate { SignedAggregateAndProof::Base(ref mut aggregate) => { @@ -3380,7 +3524,7 @@ impl ApiTester { } self.client - .post_validator_aggregate_and_proof::(&[aggregate]) + .post_validator_aggregate_and_proof_v1::(&[aggregate.clone()]) .await .unwrap_err(); @@ -3389,6 +3533,46 @@ impl ApiTester { self } + pub async fn test_get_validator_aggregate_and_proofs_valid_v2(mut self) -> Self { + let aggregate = self.get_aggregate().await; + let fork_name = self + .chain + .spec + .fork_name_at_slot::(aggregate.message().aggregate().data().slot); + self.client + .post_validator_aggregate_and_proof_v2::(&[aggregate], fork_name) + .await + .unwrap(); + + assert!(self.network_rx.network_recv.recv().await.is_some()); + + self + } + + pub async fn test_get_validator_aggregate_and_proofs_invalid_v2(mut self) -> Self { + let mut aggregate = self.get_aggregate().await; + match &mut aggregate { + SignedAggregateAndProof::Base(ref mut aggregate) => { + aggregate.message.aggregate.data.slot += 1; + } + SignedAggregateAndProof::Electra(ref mut aggregate) => { + aggregate.message.aggregate.data.slot += 1; + } + } + + let fork_name = self + .chain + .spec + .fork_name_at_slot::(aggregate.message().aggregate().data().slot); + self.client + .post_validator_aggregate_and_proof_v2::(&[aggregate], fork_name) + .await + .unwrap_err(); + assert!(self.network_rx.network_recv.recv().now_or_never().is_none()); + + self + } + pub async fn test_get_validator_beacon_committee_subscriptions(mut self) -> Self { let subscription = BeaconCommitteeSubscription { validator_index: 0, @@ -3484,7 +3668,7 @@ impl ApiTester { pub async fn test_post_validator_register_validator_slashed(self) -> Self { // slash a validator self.client - .post_beacon_pool_attester_slashings(&self.attester_slashing) + .post_beacon_pool_attester_slashings_v1(&self.attester_slashing) .await .unwrap(); @@ -3597,7 +3781,7 @@ impl ApiTester { // Attest to the current slot self.client - .post_beacon_pool_attestations(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); @@ -5237,7 +5421,7 @@ impl ApiTester { // Attest to the current slot self.client - .post_beacon_pool_attestations(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); @@ -5292,7 +5476,7 @@ impl ApiTester { let expected_attestation_len = self.attestations.len(); self.client - .post_beacon_pool_attestations(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); @@ -5801,34 +5985,66 @@ async fn post_beacon_blocks_duplicate() { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn beacon_pools_post_attestations_valid() { +async fn beacon_pools_post_attestations_valid_v1() { ApiTester::new() .await - .test_post_beacon_pool_attestations_valid() + .test_post_beacon_pool_attestations_valid_v1() .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn beacon_pools_post_attestations_invalid() { +async fn beacon_pools_post_attestations_invalid_v1() { ApiTester::new() .await - .test_post_beacon_pool_attestations_invalid() + .test_post_beacon_pool_attestations_invalid_v1() .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn beacon_pools_post_attester_slashings_valid() { +async fn beacon_pools_post_attestations_valid_v2() { ApiTester::new() .await - .test_post_beacon_pool_attester_slashings_valid() + .test_post_beacon_pool_attestations_valid_v2() .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn beacon_pools_post_attester_slashings_invalid() { +async fn beacon_pools_post_attestations_invalid_v2() { ApiTester::new() .await - .test_post_beacon_pool_attester_slashings_invalid() + .test_post_beacon_pool_attestations_invalid_v2() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn beacon_pools_post_attester_slashings_valid_v1() { + ApiTester::new() + .await + .test_post_beacon_pool_attester_slashings_valid_v1() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn beacon_pools_post_attester_slashings_invalid_v1() { + ApiTester::new() + .await + .test_post_beacon_pool_attester_slashings_invalid_v1() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn beacon_pools_post_attester_slashings_valid_v2() { + ApiTester::new() + .await + .test_post_beacon_pool_attester_slashings_valid_v2() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn beacon_pools_post_attester_slashings_invalid_v2() { + ApiTester::new() + .await + .test_post_beacon_pool_attester_slashings_invalid_v2() .await; } @@ -6156,36 +6372,70 @@ async fn get_validator_aggregate_attestation_with_skip_slots() { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn get_validator_aggregate_and_proofs_valid() { +async fn get_validator_aggregate_and_proofs_valid_v1() { ApiTester::new() .await - .test_get_validator_aggregate_and_proofs_valid() + .test_get_validator_aggregate_and_proofs_valid_v1() .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn get_validator_aggregate_and_proofs_valid_with_skip_slots() { +async fn get_validator_aggregate_and_proofs_valid_with_skip_slots_v1() { ApiTester::new() .await .skip_slots(E::slots_per_epoch() * 2) - .test_get_validator_aggregate_and_proofs_valid() + .test_get_validator_aggregate_and_proofs_valid_v1() .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn get_validator_aggregate_and_proofs_invalid() { +async fn get_validator_aggregate_and_proofs_valid_v2() { ApiTester::new() .await - .test_get_validator_aggregate_and_proofs_invalid() + .test_get_validator_aggregate_and_proofs_valid_v2() .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots() { +async fn get_validator_aggregate_and_proofs_valid_with_skip_slots_v2() { ApiTester::new() .await .skip_slots(E::slots_per_epoch() * 2) - .test_get_validator_aggregate_and_proofs_invalid() + .test_get_validator_aggregate_and_proofs_valid_v2() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_validator_aggregate_and_proofs_invalid_v1() { + ApiTester::new() + .await + .test_get_validator_aggregate_and_proofs_invalid_v1() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots_v1() { + ApiTester::new() + .await + .skip_slots(E::slots_per_epoch() * 2) + .test_get_validator_aggregate_and_proofs_invalid_v1() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_validator_aggregate_and_proofs_invalid_v2() { + ApiTester::new() + .await + .test_get_validator_aggregate_and_proofs_invalid_v2() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots_v2() { + ApiTester::new() + .await + .skip_slots(E::slots_per_epoch() * 2) + .test_get_validator_aggregate_and_proofs_invalid_v2() .await; } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 5a51aaec5a..6d000f576f 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -346,6 +346,19 @@ impl BeaconNodeHttpClient { Ok(()) } + /// Perform a HTTP POST request with a custom timeout and consensus header. + async fn post_with_timeout_and_consensus_header( + &self, + url: U, + body: &T, + timeout: Duration, + fork_name: ForkName, + ) -> Result<(), Error> { + self.post_generic_with_consensus_version(url, body, Some(timeout), fork_name) + .await?; + Ok(()) + } + /// Perform a HTTP POST request with a custom timeout, returning a JSON response. async fn post_with_timeout_and_response( &self, @@ -376,25 +389,6 @@ impl BeaconNodeHttpClient { ok_or_error(response).await } - /// Generic POST function supporting arbitrary responses and timeouts. - /// Does not include Content-Type application/json in the request header. - async fn post_generic_json_without_content_type_header( - &self, - url: U, - body: &T, - timeout: Option, - ) -> Result { - let mut builder = self.client.post(url); - if let Some(timeout) = timeout { - builder = builder.timeout(timeout); - } - - let serialized_body = serde_json::to_vec(body).map_err(Error::InvalidJson)?; - - let response = builder.body(serialized_body).send().await?; - ok_or_error(response).await - } - /// Generic POST function supporting arbitrary responses and timeouts. async fn post_generic_with_consensus_version( &self, @@ -1228,10 +1222,10 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } - /// `GET beacon/blocks/{block_id}/attestations` + /// `GET v1/beacon/blocks/{block_id}/attestations` /// /// Returns `Ok(None)` on a 404 error. - pub async fn get_beacon_blocks_attestations( + pub async fn get_beacon_blocks_attestations_v1( &self, block_id: BlockId, ) -> Result>>>, Error> { @@ -1247,8 +1241,28 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } - /// `POST beacon/pool/attestations` - pub async fn post_beacon_pool_attestations( + /// `GET v2/beacon/blocks/{block_id}/attestations` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_beacon_blocks_attestations_v2( + &self, + block_id: BlockId, + ) -> Result>>>, Error> + { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("blocks") + .push(&block_id.to_string()) + .push("attestations"); + + self.get_opt(path).await + } + + /// `POST v1/beacon/pool/attestations` + pub async fn post_beacon_pool_attestations_v1( &self, attestations: &[Attestation], ) -> Result<(), Error> { @@ -1266,8 +1280,33 @@ impl BeaconNodeHttpClient { Ok(()) } - /// `GET beacon/pool/attestations?slot,committee_index` - pub async fn get_beacon_pool_attestations( + /// `POST v2/beacon/pool/attestations` + pub async fn post_beacon_pool_attestations_v2( + &self, + attestations: &[Attestation], + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("attestations"); + + self.post_with_timeout_and_consensus_header( + path, + &attestations, + self.timeouts.attestation, + fork_name, + ) + .await?; + + Ok(()) + } + + /// `GET v1/beacon/pool/attestations?slot,committee_index` + pub async fn get_beacon_pool_attestations_v1( &self, slot: Option, committee_index: Option, @@ -1293,8 +1332,35 @@ impl BeaconNodeHttpClient { self.get(path).await } - /// `POST beacon/pool/attester_slashings` - pub async fn post_beacon_pool_attester_slashings( + /// `GET v2/beacon/pool/attestations?slot,committee_index` + pub async fn get_beacon_pool_attestations_v2( + &self, + slot: Option, + committee_index: Option, + ) -> Result>>, Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("attestations"); + + if let Some(slot) = slot { + path.query_pairs_mut() + .append_pair("slot", &slot.to_string()); + } + + if let Some(index) = committee_index { + path.query_pairs_mut() + .append_pair("committee_index", &index.to_string()); + } + + self.get(path).await + } + + /// `POST v1/beacon/pool/attester_slashings` + pub async fn post_beacon_pool_attester_slashings_v1( &self, slashing: &AttesterSlashing, ) -> Result<(), Error> { @@ -1306,14 +1372,33 @@ impl BeaconNodeHttpClient { .push("pool") .push("attester_slashings"); - self.post_generic_json_without_content_type_header(path, slashing, None) + self.post_generic(path, slashing, None).await?; + + Ok(()) + } + + /// `POST v2/beacon/pool/attester_slashings` + pub async fn post_beacon_pool_attester_slashings_v2( + &self, + slashing: &AttesterSlashing, + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("attester_slashings"); + + self.post_generic_with_consensus_version(path, slashing, None, fork_name) .await?; Ok(()) } - /// `GET beacon/pool/attester_slashings` - pub async fn get_beacon_pool_attester_slashings( + /// `GET v1/beacon/pool/attester_slashings` + pub async fn get_beacon_pool_attester_slashings_v1( &self, ) -> Result>>, Error> { let mut path = self.eth_path(V1)?; @@ -1327,6 +1412,21 @@ impl BeaconNodeHttpClient { self.get(path).await } + /// `GET v2/beacon/pool/attester_slashings` + pub async fn get_beacon_pool_attester_slashings_v2( + &self, + ) -> Result>>, Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("attester_slashings"); + + self.get(path).await + } + /// `POST beacon/pool/proposer_slashings` pub async fn post_beacon_pool_proposer_slashings( &self, @@ -2216,8 +2316,8 @@ impl BeaconNodeHttpClient { self.get_with_timeout(path, self.timeouts.attestation).await } - /// `GET validator/aggregate_attestation?slot,attestation_data_root` - pub async fn get_validator_aggregate_attestation( + /// `GET v1/validator/aggregate_attestation?slot,attestation_data_root` + pub async fn get_validator_aggregate_attestation_v1( &self, slot: Slot, attestation_data_root: Hash256, @@ -2240,6 +2340,32 @@ impl BeaconNodeHttpClient { .await } + /// `GET v2/validator/aggregate_attestation?slot,attestation_data_root,committee_index` + pub async fn get_validator_aggregate_attestation_v2( + &self, + slot: Slot, + attestation_data_root: Hash256, + committee_index: CommitteeIndex, + ) -> Result>>, Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("aggregate_attestation"); + + path.query_pairs_mut() + .append_pair("slot", &slot.to_string()) + .append_pair( + "attestation_data_root", + &format!("{:?}", attestation_data_root), + ) + .append_pair("committee_index", &committee_index.to_string()); + + self.get_opt_with_timeout(path, self.timeouts.attestation) + .await + } + /// `GET validator/sync_committee_contribution` pub async fn get_validator_sync_committee_contribution( &self, @@ -2335,8 +2461,8 @@ impl BeaconNodeHttpClient { .await } - /// `POST validator/aggregate_and_proofs` - pub async fn post_validator_aggregate_and_proof( + /// `POST v1/validator/aggregate_and_proofs` + pub async fn post_validator_aggregate_and_proof_v1( &self, aggregates: &[SignedAggregateAndProof], ) -> Result<(), Error> { @@ -2353,6 +2479,30 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST v2/validator/aggregate_and_proofs` + pub async fn post_validator_aggregate_and_proof_v2( + &self, + aggregates: &[SignedAggregateAndProof], + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("aggregate_and_proofs"); + + self.post_with_timeout_and_consensus_header( + path, + &aggregates, + self.timeouts.attestation, + fork_name, + ) + .await?; + + Ok(()) + } + /// `POST validator/beacon_committee_subscriptions` pub async fn post_validator_beacon_committee_subscriptions( &self, diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 70aa5aab3e..d399bc2bd0 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -780,6 +780,8 @@ pub struct ValidatorAttestationDataQuery { pub struct ValidatorAggregateAttestationQuery { pub attestation_data_root: Hash256, pub slot: Slot, + #[serde(skip_serializing_if = "Option::is_none")] + pub committee_index: Option, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 88993267a9..7b53a98caa 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -1,6 +1,6 @@ use crate::slot_data::SlotData; -use crate::Checkpoint; use crate::{test_utils::TestRandom, Hash256, Slot}; +use crate::{Checkpoint, ForkVersionDeserialize}; use derivative::Derivative; use safe_arith::ArithError; use serde::{Deserialize, Serialize}; @@ -26,6 +26,12 @@ pub enum Error { InvalidCommitteeIndex, } +impl From for Error { + fn from(e: ssz_types::Error) -> Self { + Error::SszTypesError(e) + } +} + #[superstruct( variants(Base, Electra), variant_attributes( @@ -487,6 +493,46 @@ impl<'a, E: EthSpec> From> for AttestationRef<'a, E> } } +impl ForkVersionDeserialize for Attestation { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::Value, + fork_name: crate::ForkName, + ) -> Result { + if fork_name.electra_enabled() { + let attestation: AttestationElectra = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(Attestation::Electra(attestation)) + } else { + let attestation: AttestationBase = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(Attestation::Base(attestation)) + } + } +} + +impl ForkVersionDeserialize for Vec> { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::Value, + fork_name: crate::ForkName, + ) -> Result { + if fork_name.electra_enabled() { + let attestations: Vec> = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(attestations + .into_iter() + .map(Attestation::Electra) + .collect::>()) + } else { + let attestations: Vec> = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(attestations + .into_iter() + .map(Attestation::Base) + .collect::>()) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/consensus/types/src/attester_slashing.rs b/consensus/types/src/attester_slashing.rs index c8e2fb4f82..f6aa654d44 100644 --- a/consensus/types/src/attester_slashing.rs +++ b/consensus/types/src/attester_slashing.rs @@ -171,6 +171,29 @@ impl TestRandom for AttesterSlashing { } } +impl crate::ForkVersionDeserialize for Vec> { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::Value, + fork_name: crate::ForkName, + ) -> Result { + if fork_name.electra_enabled() { + let slashings: Vec> = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(slashings + .into_iter() + .map(AttesterSlashing::Electra) + .collect::>()) + } else { + let slashings: Vec> = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(slashings + .into_iter() + .map(AttesterSlashing::Base) + .collect::>()) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 0cff39546d..30fe508a2c 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -287,17 +287,21 @@ impl AttestationService { // Then download, sign and publish a `SignedAggregateAndProof` for each // validator that is elected to aggregate for this `slot` and // `committee_index`. - self.produce_and_publish_aggregates(&attestation_data, &validator_duties) - .await - .map_err(move |e| { - crit!( - log, - "Error during attestation routine"; - "error" => format!("{:?}", e), - "committee_index" => committee_index, - "slot" => slot.as_u64(), - ) - })?; + self.produce_and_publish_aggregates( + &attestation_data, + committee_index, + &validator_duties, + ) + .await + .map_err(move |e| { + crit!( + log, + "Error during attestation routine"; + "error" => format!("{:?}", e), + "committee_index" => committee_index, + "slot" => slot.as_u64(), + ) + })?; } Ok(()) @@ -445,6 +449,11 @@ impl AttestationService { warn!(log, "No attestations were published"); return Ok(None); } + let fork_name = self + .context + .eth2_config + .spec + .fork_name_at_slot::(attestation_data.slot); // Post the attestations to the BN. match self @@ -458,9 +467,15 @@ impl AttestationService { &metrics::ATTESTATION_SERVICE_TIMES, &[metrics::ATTESTATIONS_HTTP_POST], ); - beacon_node - .post_beacon_pool_attestations(attestations) - .await + if fork_name.electra_enabled() { + beacon_node + .post_beacon_pool_attestations_v2(attestations, fork_name) + .await + } else { + beacon_node + .post_beacon_pool_attestations_v1(attestations) + .await + } }, ) .await @@ -504,6 +519,7 @@ impl AttestationService { async fn produce_and_publish_aggregates( &self, attestation_data: &AttestationData, + committee_index: CommitteeIndex, validator_duties: &[DutyAndProof], ) -> Result<(), String> { let log = self.context.log(); @@ -516,6 +532,12 @@ impl AttestationService { return Ok(()); } + let fork_name = self + .context + .eth2_config + .spec + .fork_name_at_slot::(attestation_data.slot); + let aggregated_attestation = &self .beacon_nodes .first_success( @@ -526,17 +548,36 @@ impl AttestationService { &metrics::ATTESTATION_SERVICE_TIMES, &[metrics::AGGREGATES_HTTP_GET], ); - beacon_node - .get_validator_aggregate_attestation( - attestation_data.slot, - attestation_data.tree_hash_root(), - ) - .await - .map_err(|e| { - format!("Failed to produce an aggregate attestation: {:?}", e) - })? - .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data)) - .map(|result| result.data) + if fork_name.electra_enabled() { + beacon_node + .get_validator_aggregate_attestation_v2( + attestation_data.slot, + attestation_data.tree_hash_root(), + committee_index, + ) + .await + .map_err(|e| { + format!("Failed to produce an aggregate attestation: {:?}", e) + })? + .ok_or_else(|| { + format!("No aggregate available for {:?}", attestation_data) + }) + .map(|result| result.data) + } else { + beacon_node + .get_validator_aggregate_attestation_v1( + attestation_data.slot, + attestation_data.tree_hash_root(), + ) + .await + .map_err(|e| { + format!("Failed to produce an aggregate attestation: {:?}", e) + })? + .ok_or_else(|| { + format!("No aggregate available for {:?}", attestation_data) + }) + .map(|result| result.data) + } }, ) .await @@ -604,9 +645,20 @@ impl AttestationService { &metrics::ATTESTATION_SERVICE_TIMES, &[metrics::AGGREGATES_HTTP_POST], ); - beacon_node - .post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice) - .await + if fork_name.electra_enabled() { + beacon_node + .post_validator_aggregate_and_proof_v2( + signed_aggregate_and_proofs_slice, + fork_name, + ) + .await + } else { + beacon_node + .post_validator_aggregate_and_proof_v1( + signed_aggregate_and_proofs_slice, + ) + .await + } }, ) .await From 6856134ded14d8986c4ddd40abd3b26f87f09924 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 16 Jul 2024 11:19:00 +1000 Subject: [PATCH 4/8] Bump default `logfile-max-number` to 10 (#6092) * Bump default `logfile-max-number` to 10. * Bump default `logfile-max-number` to 10. * Fix docs --- book/src/help_bn.md | 2 +- book/src/help_general.md | 2 +- book/src/help_vc.md | 2 +- book/src/help_vm.md | 2 +- book/src/help_vm_create.md | 2 +- book/src/help_vm_import.md | 2 +- book/src/help_vm_move.md | 2 +- lighthouse/src/main.rs | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/book/src/help_bn.md b/book/src/help_bn.md index 50484f5ec4..5288b6a1de 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -241,7 +241,7 @@ Options: [possible values: DEFAULT, JSON] --logfile-max-number The maximum number of log files that will be stored. If set to 0, - background file logging is disabled. [default: 5] + background file logging is disabled. [default: 10] --logfile-max-size The maximum size (in MB) each log file can grow to before rotating. If set to 0, background file logging is disabled. [default: 200] diff --git a/book/src/help_general.md b/book/src/help_general.md index 47ebe60983..84bc67a86e 100644 --- a/book/src/help_general.md +++ b/book/src/help_general.md @@ -70,7 +70,7 @@ Options: [possible values: DEFAULT, JSON] --logfile-max-number The maximum number of log files that will be stored. If set to 0, - background file logging is disabled. [default: 5] + background file logging is disabled. [default: 10] --logfile-max-size The maximum size (in MB) each log file can grow to before rotating. If set to 0, background file logging is disabled. [default: 200] diff --git a/book/src/help_vc.md b/book/src/help_vc.md index 1dba75e521..347c818ede 100644 --- a/book/src/help_vc.md +++ b/book/src/help_vc.md @@ -86,7 +86,7 @@ Options: [possible values: DEFAULT, JSON] --logfile-max-number The maximum number of log files that will be stored. If set to 0, - background file logging is disabled. [default: 5] + background file logging is disabled. [default: 10] --logfile-max-size The maximum size (in MB) each log file can grow to before rotating. If set to 0, background file logging is disabled. [default: 200] diff --git a/book/src/help_vm.md b/book/src/help_vm.md index 6f9cc405e7..99a45c1a76 100644 --- a/book/src/help_vm.md +++ b/book/src/help_vm.md @@ -62,7 +62,7 @@ Options: [possible values: DEFAULT, JSON] --logfile-max-number The maximum number of log files that will be stored. If set to 0, - background file logging is disabled. [default: 5] + background file logging is disabled. [default: 10] --logfile-max-size The maximum size (in MB) each log file can grow to before rotating. If set to 0, background file logging is disabled. [default: 200] diff --git a/book/src/help_vm_create.md b/book/src/help_vm_create.md index 4ddb360e48..1803bb534c 100644 --- a/book/src/help_vm_create.md +++ b/book/src/help_vm_create.md @@ -74,7 +74,7 @@ Options: [possible values: DEFAULT, JSON] --logfile-max-number The maximum number of log files that will be stored. If set to 0, - background file logging is disabled. [default: 5] + background file logging is disabled. [default: 10] --logfile-max-size The maximum size (in MB) each log file can grow to before rotating. If set to 0, background file logging is disabled. [default: 200] diff --git a/book/src/help_vm_import.md b/book/src/help_vm_import.md index 799a1db82b..e18aad7958 100644 --- a/book/src/help_vm_import.md +++ b/book/src/help_vm_import.md @@ -43,7 +43,7 @@ Options: [possible values: DEFAULT, JSON] --logfile-max-number The maximum number of log files that will be stored. If set to 0, - background file logging is disabled. [default: 5] + background file logging is disabled. [default: 10] --logfile-max-size The maximum size (in MB) each log file can grow to before rotating. If set to 0, background file logging is disabled. [default: 200] diff --git a/book/src/help_vm_move.md b/book/src/help_vm_move.md index 9b92e21bc2..faef0a5783 100644 --- a/book/src/help_vm_move.md +++ b/book/src/help_vm_move.md @@ -63,7 +63,7 @@ Options: [possible values: DEFAULT, JSON] --logfile-max-number The maximum number of log files that will be stored. If set to 0, - background file logging is disabled. [default: 5] + background file logging is disabled. [default: 10] --logfile-max-size The maximum size (in MB) each log file can grow to before rotating. If set to 0, background file logging is disabled. [default: 200] diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index d6d670738a..481e17dbc8 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -169,7 +169,7 @@ fn main() { "The maximum number of log files that will be stored. If set to 0, \ background file logging is disabled.") .action(ArgAction::Set) - .default_value("5") + .default_value("10") .global(true) .display_order(0) ) From 79680c886dcdd1577edbd3a93f920df221cafc8c Mon Sep 17 00:00:00 2001 From: chonghe <44791194+chong-he@users.noreply.github.com> Date: Tue, 16 Jul 2024 12:39:55 +0800 Subject: [PATCH 5/8] Add `block_gossip` Beacon API events (#5864) * Add bls event * Update events and types * Add bls in event * Event bls * tests..rs * change order * another tests.rs * Signed BLS * Revert "another tests.rs" This reverts commit 7f54e9c1cea967e1fd6713fa7b11752c25d3b610. * Revert "Signed BLS" This reverts commit 1146bc734b3e8ed2067fb929d6b0a0b16b4154d3. * withdrawal_keyparis * Fix genesis * block gossip * Add definition for BlockGossip * Fix block gossip * Tests.rs * Update block and events * Add bls event * Event bls * tests..rs * change order * another tests.rs * Signed BLS * Revert "another tests.rs" This reverts commit 7f54e9c1cea967e1fd6713fa7b11752c25d3b610. * Revert "Signed BLS" This reverts commit 1146bc734b3e8ed2067fb929d6b0a0b16b4154d3. * block gossip * Add definition for BlockGossip * Fix block gossip * Tests.rs * Update block and events * Merge branch 'BeaconAPI-events-block-gossip' of https://github.com/chong-he/lighthouse into BeaconAPI-events-block-gossip * Remove tests * Tests.rs * Tests.rs * Tests.rs * Tests similar to block event * Update common/eth2/src/types.rs Co-authored-by: Michael Sproul * Merge remote-tracking branch 'origin/unstable' into BeaconAPI-events-block-gossip * Fix tests --- .../beacon_chain/src/block_verification.rs | 12 +++++++++++- beacon_node/beacon_chain/src/events.rs | 15 +++++++++++++++ beacon_node/http_api/src/lib.rs | 3 +++ beacon_node/http_api/tests/tests.rs | 15 +++++++++++++-- common/eth2/src/types.rs | 13 +++++++++++++ 5 files changed, 55 insertions(+), 3 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 734b12ca83..33605c6b1d 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -67,7 +67,7 @@ use crate::{ metrics, BeaconChain, BeaconChainError, BeaconChainTypes, }; use derivative::Derivative; -use eth2::types::{EventKind, PublishBlockRequest}; +use eth2::types::{BlockGossip, EventKind, PublishBlockRequest}; use execution_layer::PayloadStatus; pub use fork_choice::{AttestationFromBlock, PayloadVerificationStatus}; use parking_lot::RwLockReadGuard; @@ -974,6 +974,16 @@ impl GossipVerifiedBlock { // Validate the block's execution_payload (if any). validate_execution_payload_for_gossip(&parent_block, block.message(), chain)?; + // Beacon API block_gossip events + if let Some(event_handler) = chain.event_handler.as_ref() { + if event_handler.has_block_gossip_subscribers() { + event_handler.register(EventKind::BlockGossip(Box::new(BlockGossip { + slot: block.slot(), + block: block_root, + }))); + } + } + // Having checked the proposer index and the block root we can cache them. let consensus_context = ConsensusContext::new(block.slot()) .set_current_block_root(block_root) diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 5f91fe5d0c..267d56220c 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -23,6 +23,7 @@ pub struct ServerSentEventHandler { proposer_slashing_tx: Sender>, attester_slashing_tx: Sender>, bls_to_execution_change_tx: Sender>, + block_gossip_tx: Sender>, log: Logger, } @@ -51,6 +52,7 @@ impl ServerSentEventHandler { let (proposer_slashing_tx, _) = broadcast::channel(capacity); let (attester_slashing_tx, _) = broadcast::channel(capacity); let (bls_to_execution_change_tx, _) = broadcast::channel(capacity); + let (block_gossip_tx, _) = broadcast::channel(capacity); Self { attestation_tx, @@ -69,6 +71,7 @@ impl ServerSentEventHandler { proposer_slashing_tx, attester_slashing_tx, bls_to_execution_change_tx, + block_gossip_tx, log, } } @@ -147,6 +150,10 @@ impl ServerSentEventHandler { .bls_to_execution_change_tx .send(kind) .map(|count| log_count("bls to execution change", count)), + EventKind::BlockGossip(_) => self + .block_gossip_tx + .send(kind) + .map(|count| log_count("block gossip", count)), }; if let Err(SendError(event)) = result { trace!(self.log, "No receivers registered to listen for event"; "event" => ?event); @@ -217,6 +224,10 @@ impl ServerSentEventHandler { self.bls_to_execution_change_tx.subscribe() } + pub fn subscribe_block_gossip(&self) -> Receiver> { + self.block_gossip_tx.subscribe() + } + pub fn has_attestation_subscribers(&self) -> bool { self.attestation_tx.receiver_count() > 0 } @@ -272,4 +283,8 @@ impl ServerSentEventHandler { pub fn has_bls_to_execution_change_subscribers(&self) -> bool { self.bls_to_execution_change_tx.receiver_count() > 0 } + + pub fn has_block_gossip_subscribers(&self) -> bool { + self.block_gossip_tx.receiver_count() > 0 + } } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 2d017d6539..59bf367b4c 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4474,6 +4474,9 @@ pub fn serve( api_types::EventTopic::BlsToExecutionChange => { event_handler.subscribe_bls_to_execution_change() } + api_types::EventTopic::BlockGossip => { + event_handler.subscribe_block_gossip() + } }; receivers.push( diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index f511f25d32..633baaf6f4 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -5461,6 +5461,7 @@ impl ApiTester { EventTopic::Attestation, EventTopic::VoluntaryExit, EventTopic::Block, + EventTopic::BlockGossip, EventTopic::Head, EventTopic::FinalizedCheckpoint, EventTopic::AttesterSlashing, @@ -5576,10 +5577,20 @@ impl ApiTester { .await .unwrap(); - let block_events = poll_events(&mut events_future, 3, Duration::from_millis(10000)).await; + let expected_gossip = EventKind::BlockGossip(Box::new(BlockGossip { + slot: next_slot, + block: block_root, + })); + + let block_events = poll_events(&mut events_future, 4, Duration::from_millis(10000)).await; assert_eq!( block_events.as_slice(), - &[expected_block, expected_head, expected_finalized] + &[ + expected_gossip, + expected_block, + expected_head, + expected_finalized + ] ); // Test a reorg event diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index d399bc2bd0..bbcbda3ae5 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -971,6 +971,11 @@ pub struct SseHead { pub execution_optimistic: bool, } +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +pub struct BlockGossip { + pub slot: Slot, + pub block: Hash256, +} #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] pub struct SseChainReorg { pub slot: Slot, @@ -1100,6 +1105,7 @@ pub enum EventKind { ProposerSlashing(Box), AttesterSlashing(Box>), BlsToExecutionChange(Box), + BlockGossip(Box), } impl EventKind { @@ -1122,6 +1128,7 @@ impl EventKind { EventKind::ProposerSlashing(_) => "proposer_slashing", EventKind::AttesterSlashing(_) => "attester_slashing", EventKind::BlsToExecutionChange(_) => "bls_to_execution_change", + EventKind::BlockGossip(_) => "block_gossip", } } @@ -1217,6 +1224,9 @@ impl EventKind { ServerError::InvalidServerSentEvent(format!("Bls To Execution Change: {:?}", e)) })?, )), + "block_gossip" => Ok(EventKind::BlockGossip(serde_json::from_str(data).map_err( + |e| ServerError::InvalidServerSentEvent(format!("Block Gossip: {:?}", e)), + )?)), _ => Err(ServerError::InvalidServerSentEvent( "Could not parse event tag".to_string(), )), @@ -1251,6 +1261,7 @@ pub enum EventTopic { AttesterSlashing, ProposerSlashing, BlsToExecutionChange, + BlockGossip, } impl FromStr for EventTopic { @@ -1275,6 +1286,7 @@ impl FromStr for EventTopic { "attester_slashing" => Ok(EventTopic::AttesterSlashing), "proposer_slashing" => Ok(EventTopic::ProposerSlashing), "bls_to_execution_change" => Ok(EventTopic::BlsToExecutionChange), + "block_gossip" => Ok(EventTopic::BlockGossip), _ => Err("event topic cannot be parsed.".to_string()), } } @@ -1300,6 +1312,7 @@ impl fmt::Display for EventTopic { EventTopic::AttesterSlashing => write!(f, "attester_slashing"), EventTopic::ProposerSlashing => write!(f, "proposer_slashing"), EventTopic::BlsToExecutionChange => write!(f, "bls_to_execution_change"), + EventTopic::BlockGossip => write!(f, "block_gossip"), } } } From 26c2e623cb265ea96b464344fe646394e3779423 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 16 Jul 2024 14:39:58 +1000 Subject: [PATCH 6/8] Measure consensus verification time (#6089) * Measure consensus verification time * Track execution time properly --- beacon_node/beacon_chain/src/beacon_chain.rs | 21 ++- .../beacon_chain/src/block_times_cache.rs | 122 +++++++++++++----- .../beacon_chain/src/block_verification.rs | 7 + .../beacon_chain/src/canonical_head.rs | 11 ++ beacon_node/beacon_chain/src/metrics.rs | 5 + 5 files changed, 124 insertions(+), 42 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0595d53c07..19ee3d116c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3088,14 +3088,21 @@ impl BeaconChain { notify_execution_layer, )?; publish_fn()?; + + // Record the time it took to complete consensus verification. + if let Some(timestamp) = self.slot_clock.now_duration() { + self.block_times_cache + .write() + .set_time_consensus_verified(block_root, block_slot, timestamp) + } + let executed_block = chain.into_executed_block(execution_pending).await?; - // Record the time it took to ask the execution layer. - if let Some(seen_timestamp) = self.slot_clock.now_duration() { - self.block_times_cache.write().set_execution_time( - block_root, - block_slot, - seen_timestamp, - ) + + // Record the *additional* time it took to wait for execution layer verification. + if let Some(timestamp) = self.slot_clock.now_duration() { + self.block_times_cache + .write() + .set_time_executed(block_root, block_slot, timestamp) } match executed_block { diff --git a/beacon_node/beacon_chain/src/block_times_cache.rs b/beacon_node/beacon_chain/src/block_times_cache.rs index db547a1186..3b75046f3a 100644 --- a/beacon_node/beacon_chain/src/block_times_cache.rs +++ b/beacon_node/beacon_chain/src/block_times_cache.rs @@ -19,7 +19,9 @@ type BlockRoot = Hash256; pub struct Timestamps { pub observed: Option, pub all_blobs_observed: Option, - pub execution_time: Option, + pub consensus_verified: Option, + pub started_execution: Option, + pub executed: Option, pub attestable: Option, pub imported: Option, pub set_as_head: Option, @@ -32,7 +34,9 @@ pub struct BlockDelays { pub observed: Option, /// The time after the start of the slot we saw all blobs. pub all_blobs_observed: Option, - /// The time it took to get verification from the EL for the block. + /// The time it took to complete consensus verification of the block. + pub consensus_verification_time: Option, + /// The time it took to complete execution verification of the block. pub execution_time: Option, /// The delay from the start of the slot before the block became available /// @@ -58,13 +62,16 @@ impl BlockDelays { let all_blobs_observed = times .all_blobs_observed .and_then(|all_blobs_observed| all_blobs_observed.checked_sub(slot_start_time)); + let consensus_verification_time = times + .consensus_verified + .and_then(|consensus_verified| consensus_verified.checked_sub(times.observed?)); let execution_time = times - .execution_time - .and_then(|execution_time| execution_time.checked_sub(times.observed?)); + .executed + .and_then(|executed| executed.checked_sub(times.started_execution?)); // Duration since UNIX epoch at which block became available. - let available_time = times.execution_time.map(|execution_time| { - std::cmp::max(execution_time, times.all_blobs_observed.unwrap_or_default()) - }); + let available_time = times + .executed + .map(|executed| std::cmp::max(executed, times.all_blobs_observed.unwrap_or_default())); // Duration from the start of the slot until the block became available. let available_delay = available_time.and_then(|available_time| available_time.checked_sub(slot_start_time)); @@ -80,6 +87,7 @@ impl BlockDelays { BlockDelays { observed, all_blobs_observed, + consensus_verification_time, execution_time, available: available_delay, attestable, @@ -155,6 +163,9 @@ impl BlockTimesCache { slot: Slot, timestamp: Duration, ) { + // Unlike other functions in this file, we update the blob observed time only if it is + // *greater* than existing blob observation times. This allows us to know the observation + // time of the last blob to arrive. let block_times = self .cache .entry(block_root) @@ -168,48 +179,89 @@ impl BlockTimesCache { } } - pub fn set_execution_time(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { + /// Set the timestamp for `field` if that timestamp is less than any previously known value. + /// + /// If no previous value is known for the field, then the supplied timestamp will always be + /// stored. + pub fn set_time_if_less( + &mut self, + block_root: BlockRoot, + slot: Slot, + field: impl Fn(&mut Timestamps) -> &mut Option, + timestamp: Duration, + ) { let block_times = self .cache .entry(block_root) .or_insert_with(|| BlockTimesCacheValue::new(slot)); - if block_times - .timestamps - .execution_time - .map_or(true, |prev| timestamp < prev) - { - block_times.timestamps.execution_time = Some(timestamp); + let existing_timestamp = field(&mut block_times.timestamps); + if existing_timestamp.map_or(true, |prev| timestamp < prev) { + *existing_timestamp = Some(timestamp); } } + pub fn set_time_consensus_verified( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + ) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.consensus_verified, + timestamp, + ) + } + + pub fn set_time_executed(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.executed, + timestamp, + ) + } + + pub fn set_time_started_execution( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + ) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.started_execution, + timestamp, + ) + } + pub fn set_time_attestable(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { - let block_times = self - .cache - .entry(block_root) - .or_insert_with(|| BlockTimesCacheValue::new(slot)); - if block_times - .timestamps - .attestable - .map_or(true, |prev| timestamp < prev) - { - block_times.timestamps.attestable = Some(timestamp); - } + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.attestable, + timestamp, + ) } pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { - let block_times = self - .cache - .entry(block_root) - .or_insert_with(|| BlockTimesCacheValue::new(slot)); - block_times.timestamps.imported = Some(timestamp); + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.imported, + timestamp, + ) } pub fn set_time_set_as_head(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { - let block_times = self - .cache - .entry(block_root) - .or_insert_with(|| BlockTimesCacheValue::new(slot)); - block_times.timestamps.set_as_head = Some(timestamp); + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.set_as_head, + timestamp, + ) } pub fn get_block_delays( diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 33605c6b1d..d906518ff5 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1344,6 +1344,13 @@ impl ExecutionPendingBlock { // The specification declares that this should be run *inside* `per_block_processing`, // however we run it here to keep `per_block_processing` pure (i.e., no calls to external // servers). + if let Some(started_execution) = chain.slot_clock.now_duration() { + chain.block_times_cache.write().set_time_started_execution( + block_root, + block.slot(), + started_execution, + ); + } let payload_verification_status = payload_notifier.notify_new_payload().await?; // If the payload did not validate or invalidate the block, check to see if this block is diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index a84cfab298..84e1544451 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -1385,6 +1385,15 @@ fn observe_head_block_delays( .as_millis() as i64, ); + // The time it took to check the validity within Lighthouse + metrics::set_gauge( + &metrics::BEACON_BLOCK_DELAY_CONSENSUS_VERIFICATION_TIME, + block_delays + .consensus_verification_time + .unwrap_or_else(|| Duration::from_secs(0)) + .as_millis() as i64, + ); + // The time it took to check the validity with the EL metrics::set_gauge( &metrics::BEACON_BLOCK_DELAY_EXECUTION_TIME, @@ -1447,6 +1456,7 @@ fn observe_head_block_delays( "total_delay_ms" => block_delay_total.as_millis(), "observed_delay_ms" => format_delay(&block_delays.observed), "blob_delay_ms" => format_delay(&block_delays.all_blobs_observed), + "consensus_time_ms" => format_delay(&block_delays.consensus_verification_time), "execution_time_ms" => format_delay(&block_delays.execution_time), "available_delay_ms" => format_delay(&block_delays.available), "attestable_delay_ms" => format_delay(&block_delays.attestable), @@ -1463,6 +1473,7 @@ fn observe_head_block_delays( "total_delay_ms" => block_delay_total.as_millis(), "observed_delay_ms" => format_delay(&block_delays.observed), "blob_delay_ms" => format_delay(&block_delays.all_blobs_observed), + "consensus_time_ms" => format_delay(&block_delays.consensus_verification_time), "execution_time_ms" => format_delay(&block_delays.execution_time), "available_delay_ms" => format_delay(&block_delays.available), "attestable_delay_ms" => format_delay(&block_delays.attestable), diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index be8f46f7d1..064b2b199f 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -857,6 +857,11 @@ lazy_static! { "Duration between the start of the block's slot and the time the block was observed.", ); + pub static ref BEACON_BLOCK_DELAY_CONSENSUS_VERIFICATION_TIME: Result = try_create_int_gauge( + "beacon_block_delay_consensus_verification_time", + "The time taken to verify the block within Lighthouse", + ); + pub static ref BEACON_BLOCK_DELAY_EXECUTION_TIME: Result = try_create_int_gauge( "beacon_block_delay_execution_time", "The duration in verifying the block with the execution layer.", From 77d491bea152d5a4e920bef30ba3529bfd89a757 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 16 Jul 2024 14:40:00 +1000 Subject: [PATCH 7/8] Remove trace logging (#6103) * Remove trace logging --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a76ee7e236..fad5fbead1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -154,7 +154,7 @@ serde_json = "1" serde_repr = "0.1" serde_yaml = "0.9" sha2 = "0.9" -slog = { version = "2", features = ["max_level_trace", "release_max_level_trace", "nested-values"] } +slog = { version = "2", features = ["max_level_debug", "release_max_level_debug", "nested-values"] } slog-async = "2" slog-term = "2" sloggers = { version = "2", features = ["json"] } From bf2f0b02b84d071e7048b1e53afe2e6cb0746153 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 16 Jul 2024 08:57:58 +0100 Subject: [PATCH 8/8] Remove VC response signing and fix HTTP error handling (#5529) * and_then to then remove expect move convert_rejection to utils remove signer from vc api * remove key * remove auth header * revert * Merge branch 'unstable' of https://github.com/sigp/lighthouse into vc-api-fix * merge unstable * revert * Merge branch 'unstable' of https://github.com/sigp/lighthouse into vc-api-fix * Merge branch 'unstable' of https://github.com/sigp/lighthouse into vc-api-fix * refactor blocking json task * linting * revert logging * remove response signing checks in validtor http_api client * remove notion of public key, prefixes, and simplify token generation * fmt * Remove outdated comment on public key --- beacon_node/http_api/src/lib.rs | 8 +- beacon_node/http_api/src/task_spawner.rs | 19 +- common/eth2/src/lighthouse_vc/http_client.rs | 116 ++------ common/eth2/src/lighthouse_vc/mod.rs | 7 - common/warp_utils/src/reject.rs | 20 +- common/warp_utils/src/task.rs | 9 +- validator_client/src/http_api/api_secret.rs | 158 ++--------- validator_client/src/http_api/mod.rs | 275 ++++++------------- 8 files changed, 168 insertions(+), 444 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 59bf367b4c..2d50dc6c63 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -97,7 +97,7 @@ use warp::hyper::Body; use warp::sse::Event; use warp::Reply; use warp::{http::Response, Filter, Rejection}; -use warp_utils::{query::multi_key_query, uor::UnifyingOrFilter}; +use warp_utils::{query::multi_key_query, reject::convert_rejection, uor::UnifyingOrFilter}; const API_PREFIX: &str = "eth"; @@ -1802,7 +1802,7 @@ pub fn serve( ) .await .map(|()| warp::reply::json(&())); - task_spawner::convert_rejection(result).await + convert_rejection(result).await }, ); @@ -3817,12 +3817,12 @@ pub fn serve( .await; if initial_result.is_err() { - return task_spawner::convert_rejection(initial_result).await; + return convert_rejection(initial_result).await; } // Await a response from the builder without blocking a // `BeaconProcessor` worker. - task_spawner::convert_rejection(rx.await.unwrap_or_else(|_| { + convert_rejection(rx.await.unwrap_or_else(|_| { Ok(warp::reply::with_status( warp::reply::json(&"No response from channel"), eth2::StatusCode::INTERNAL_SERVER_ERROR, diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs index cfee5e01ca..a679b294f6 100644 --- a/beacon_node/http_api/src/task_spawner.rs +++ b/beacon_node/http_api/src/task_spawner.rs @@ -4,6 +4,7 @@ use std::future::Future; use tokio::sync::{mpsc::error::TrySendError, oneshot}; use types::EthSpec; use warp::reply::{Reply, Response}; +use warp_utils::reject::convert_rejection; /// Maps a request to a queue in the `BeaconProcessor`. #[derive(Clone, Copy)] @@ -35,24 +36,6 @@ pub struct TaskSpawner { beacon_processor_send: Option>, } -/// Convert a warp `Rejection` into a `Response`. -/// -/// This function should *always* be used to convert rejections into responses. This prevents warp -/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404 -pub async fn convert_rejection(res: Result) -> Response { - match res { - Ok(response) => response.into_response(), - Err(e) => match warp_utils::reject::handle_rejection(e).await { - Ok(reply) => reply.into_response(), - Err(_) => warp::reply::with_status( - warp::reply::json(&"unhandled error"), - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(), - }, - } -} - impl TaskSpawner { pub fn new(beacon_processor_send: Option>) -> Self { Self { diff --git a/common/eth2/src/lighthouse_vc/http_client.rs b/common/eth2/src/lighthouse_vc/http_client.rs index 83aeea4bfc..67fe77a315 100644 --- a/common/eth2/src/lighthouse_vc/http_client.rs +++ b/common/eth2/src/lighthouse_vc/http_client.rs @@ -1,13 +1,10 @@ -use super::{types::*, PK_LEN, SECRET_PREFIX}; +use super::types::*; use crate::Error; use account_utils::ZeroizeString; -use bytes::Bytes; -use libsecp256k1::{Message, PublicKey, Signature}; use reqwest::{ header::{HeaderMap, HeaderValue}, IntoUrl, }; -use ring::digest::{digest, SHA256}; use sensitive_url::SensitiveUrl; use serde::{de::DeserializeOwned, Serialize}; use std::fmt::{self, Display}; @@ -24,8 +21,7 @@ use types::graffiti::GraffitiString; pub struct ValidatorClientHttpClient { client: reqwest::Client, server: SensitiveUrl, - secret: Option, - server_pubkey: Option, + api_token: Option, authorization_header: AuthorizationHeader, } @@ -46,45 +42,13 @@ impl Display for AuthorizationHeader { } } -/// Parse an API token and return a secp256k1 public key. -/// -/// If the token does not start with the Lighthouse token prefix then `Ok(None)` will be returned. -/// An error will be returned if the token looks like a Lighthouse token but doesn't correspond to a -/// valid public key. -pub fn parse_pubkey(secret: &str) -> Result, Error> { - let secret = if !secret.starts_with(SECRET_PREFIX) { - return Ok(None); - } else { - &secret[SECRET_PREFIX.len()..] - }; - - serde_utils::hex::decode(secret) - .map_err(|e| Error::InvalidSecret(format!("invalid hex: {:?}", e))) - .and_then(|bytes| { - if bytes.len() != PK_LEN { - return Err(Error::InvalidSecret(format!( - "expected {} bytes not {}", - PK_LEN, - bytes.len() - ))); - } - - let mut arr = [0; PK_LEN]; - arr.copy_from_slice(&bytes); - PublicKey::parse_compressed(&arr) - .map_err(|e| Error::InvalidSecret(format!("invalid secp256k1 pubkey: {:?}", e))) - }) - .map(Some) -} - impl ValidatorClientHttpClient { /// Create a new client pre-initialised with an API token. pub fn new(server: SensitiveUrl, secret: String) -> Result { Ok(Self { client: reqwest::Client::new(), server, - server_pubkey: parse_pubkey(&secret)?, - secret: Some(secret.into()), + api_token: Some(secret.into()), authorization_header: AuthorizationHeader::Bearer, }) } @@ -96,8 +60,7 @@ impl ValidatorClientHttpClient { Ok(Self { client: reqwest::Client::new(), server, - secret: None, - server_pubkey: None, + api_token: None, authorization_header: AuthorizationHeader::Omit, }) } @@ -110,15 +73,14 @@ impl ValidatorClientHttpClient { Ok(Self { client, server, - server_pubkey: parse_pubkey(&secret)?, - secret: Some(secret.into()), + api_token: Some(secret.into()), authorization_header: AuthorizationHeader::Bearer, }) } /// Get a reference to this client's API token, if any. pub fn api_token(&self) -> Option<&ZeroizeString> { - self.secret.as_ref() + self.api_token.as_ref() } /// Read an API token from the specified `path`, stripping any trailing whitespace. @@ -128,19 +90,11 @@ impl ValidatorClientHttpClient { } /// Add an authentication token to use when making requests. - /// - /// If the token is Lighthouse-like, a pubkey derivation will be attempted. In the case - /// of failure the token will still be stored, and the client can continue to be used to - /// communicate with non-Lighthouse nodes. pub fn add_auth_token(&mut self, token: ZeroizeString) -> Result<(), Error> { - let pubkey_res = parse_pubkey(token.as_str()); - - self.secret = Some(token); + self.api_token = Some(token); self.authorization_header = AuthorizationHeader::Bearer; - pubkey_res.map(|opt_pubkey| { - self.server_pubkey = opt_pubkey; - }) + Ok(()) } /// Set to `false` to disable sending the `Authorization` header on requests. @@ -160,49 +114,17 @@ impl ValidatorClientHttpClient { self.authorization_header = AuthorizationHeader::Basic; } - async fn signed_body(&self, response: Response) -> Result { - let server_pubkey = self.server_pubkey.as_ref().ok_or(Error::NoServerPubkey)?; - let sig = response - .headers() - .get("Signature") - .ok_or(Error::MissingSignatureHeader)? - .to_str() - .map_err(|_| Error::InvalidSignatureHeader)? - .to_string(); - - let body = response.bytes().await.map_err(Error::from)?; - - let message = - Message::parse_slice(digest(&SHA256, &body).as_ref()).expect("sha256 is 32 bytes"); - - serde_utils::hex::decode(&sig) - .ok() - .and_then(|bytes| { - let sig = Signature::parse_der(&bytes).ok()?; - Some(libsecp256k1::verify(&message, &sig, server_pubkey)) - }) - .filter(|is_valid| *is_valid) - .ok_or(Error::InvalidSignatureHeader)?; - - Ok(body) - } - - async fn signed_json(&self, response: Response) -> Result { - let body = self.signed_body(response).await?; - serde_json::from_slice(&body).map_err(Error::InvalidJson) - } - fn headers(&self) -> Result { let mut headers = HeaderMap::new(); if self.authorization_header == AuthorizationHeader::Basic || self.authorization_header == AuthorizationHeader::Bearer { - let secret = self.secret.as_ref().ok_or(Error::NoToken)?; + let auth_header_token = self.api_token().ok_or(Error::NoToken)?; let header_value = HeaderValue::from_str(&format!( "{} {}", self.authorization_header, - secret.as_str() + auth_header_token.as_str() )) .map_err(|e| { Error::InvalidSecret(format!("secret is invalid as a header value: {}", e)) @@ -240,7 +162,8 @@ impl ValidatorClientHttpClient { async fn get(&self, url: U) -> Result { let response = self.get_response(url).await?; - self.signed_json(response).await + let body = response.bytes().await.map_err(Error::from)?; + serde_json::from_slice(&body).map_err(Error::InvalidJson) } async fn delete(&self, url: U) -> Result<(), Error> { @@ -263,7 +186,14 @@ impl ValidatorClientHttpClient { /// Perform a HTTP GET request, returning `None` on a 404 error. async fn get_opt(&self, url: U) -> Result, Error> { match self.get_response(url).await { - Ok(resp) => self.signed_json(resp).await.map(Option::Some), + Ok(resp) => { + let body = resp.bytes().await.map(Option::Some)?; + if let Some(body) = body { + serde_json::from_slice(&body).map_err(Error::InvalidJson) + } else { + Ok(None) + } + } Err(err) => { if err.status() == Some(StatusCode::NOT_FOUND) { Ok(None) @@ -297,7 +227,8 @@ impl ValidatorClientHttpClient { body: &T, ) -> Result { let response = self.post_with_raw_response(url, body).await?; - self.signed_json(response).await + let body = response.bytes().await.map_err(Error::from)?; + serde_json::from_slice(&body).map_err(Error::InvalidJson) } async fn post_with_unsigned_response( @@ -319,8 +250,7 @@ impl ValidatorClientHttpClient { .send() .await .map_err(Error::from)?; - let response = ok_or_error(response).await?; - self.signed_body(response).await?; + ok_or_error(response).await?; Ok(()) } diff --git a/common/eth2/src/lighthouse_vc/mod.rs b/common/eth2/src/lighthouse_vc/mod.rs index 81b4fca283..038726c829 100644 --- a/common/eth2/src/lighthouse_vc/mod.rs +++ b/common/eth2/src/lighthouse_vc/mod.rs @@ -1,10 +1,3 @@ pub mod http_client; pub mod std_types; pub mod types; - -/// The number of bytes in the secp256k1 public key used as the authorization token for the VC API. -pub const PK_LEN: usize = 33; - -/// The prefix for the secp256k1 public key when it is used as the authorization token for the VC -/// API. -pub const SECRET_PREFIX: &str = "api-token-"; diff --git a/common/warp_utils/src/reject.rs b/common/warp_utils/src/reject.rs index d33f32251b..9b28c65212 100644 --- a/common/warp_utils/src/reject.rs +++ b/common/warp_utils/src/reject.rs @@ -2,7 +2,7 @@ use eth2::types::{ErrorMessage, Failure, IndexedErrorMessage}; use std::convert::Infallible; use std::error::Error; use std::fmt; -use warp::{http::StatusCode, reject::Reject}; +use warp::{http::StatusCode, reject::Reject, reply::Response, Reply}; #[derive(Debug)] pub struct ServerSentEventError(pub String); @@ -255,3 +255,21 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result(res: Result) -> Response { + match res { + Ok(response) => response.into_response(), + Err(e) => match handle_rejection(e).await { + Ok(reply) => reply.into_response(), + Err(_) => warp::reply::with_status( + warp::reply::json(&"unhandled error"), + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(), + }, + } +} diff --git a/common/warp_utils/src/task.rs b/common/warp_utils/src/task.rs index 001231f2c6..e2fa4ebc36 100644 --- a/common/warp_utils/src/task.rs +++ b/common/warp_utils/src/task.rs @@ -1,3 +1,4 @@ +use crate::reject::convert_rejection; use serde::Serialize; use warp::reply::{Reply, Response}; @@ -24,14 +25,16 @@ where } /// A convenience wrapper around `blocking_task` for use with `warp` JSON responses. -pub async fn blocking_json_task(func: F) -> Result +pub async fn blocking_json_task(func: F) -> Response where F: FnOnce() -> Result + Send + 'static, T: Serialize + Send + 'static, { - blocking_response_task(|| { + let result = blocking_response_task(|| { let response = func()?; Ok(warp::reply::json(&response)) }) - .await + .await; + + convert_rejection(result).await } diff --git a/validator_client/src/http_api/api_secret.rs b/validator_client/src/http_api/api_secret.rs index e688792ddc..32035caf47 100644 --- a/validator_client/src/http_api/api_secret.rs +++ b/validator_client/src/http_api/api_secret.rs @@ -1,85 +1,53 @@ -use eth2::lighthouse_vc::{PK_LEN, SECRET_PREFIX as PK_PREFIX}; use filesystem::create_with_600_perms; -use libsecp256k1::{Message, PublicKey, SecretKey}; -use rand::thread_rng; -use ring::digest::{digest, SHA256}; +use rand::distributions::Alphanumeric; +use rand::{thread_rng, Rng}; use std::fs; use std::path::{Path, PathBuf}; use warp::Filter; -/// The name of the file which stores the secret key. -/// -/// It is purposefully opaque to prevent users confusing it with the "secret" that they need to -/// share with API consumers (which is actually the public key). -pub const SK_FILENAME: &str = ".secp-sk"; - -/// Length of the raw secret key, in bytes. -pub const SK_LEN: usize = 32; - -/// The name of the file which stores the public key. -/// -/// For users, this public key is a "secret" that can be shared with API consumers to provide them -/// access to the API. We avoid calling it a "public" key to users, since they should not post this -/// value in a public forum. +/// The name of the file which stores the API token. pub const PK_FILENAME: &str = "api-token.txt"; -/// Contains a `secp256k1` keypair that is saved-to/loaded-from disk on instantiation. The keypair -/// is used for authorization/authentication for requests/responses on the HTTP API. +pub const PK_LEN: usize = 33; + +/// Contains a randomly generated string which is used for authorization of requests to the HTTP API. /// /// Provides convenience functions to ultimately provide: /// -/// - A signature across outgoing HTTP responses, applied to the `Signature` header. /// - Verification of proof-of-knowledge of the public key in `self` for incoming HTTP requests, /// via the `Authorization` header. /// /// The aforementioned scheme was first defined here: /// /// https://github.com/sigp/lighthouse/issues/1269#issuecomment-649879855 +/// +/// This scheme has since been tweaked to remove VC response signing and secp256k1 key generation. +/// https://github.com/sigp/lighthouse/issues/5423 pub struct ApiSecret { - pk: PublicKey, - sk: SecretKey, + pk: String, pk_path: PathBuf, } impl ApiSecret { - /// If both the secret and public keys are already on-disk, parse them and ensure they're both - /// from the same keypair. + /// If the public key is already on-disk, use it. /// - /// The provided `dir` is a directory containing two files, `SK_FILENAME` and `PK_FILENAME`. + /// The provided `dir` is a directory containing `PK_FILENAME`. /// - /// If either the secret or public key files are missing on disk, create a new keypair and + /// If the public key file is missing on disk, create a new key and /// write it to disk (over-writing any existing files). pub fn create_or_open>(dir: P) -> Result { - let sk_path = dir.as_ref().join(SK_FILENAME); let pk_path = dir.as_ref().join(PK_FILENAME); - if !(sk_path.exists() && pk_path.exists()) { - let sk = SecretKey::random(&mut thread_rng()); - let pk = PublicKey::from_secret_key(&sk); - - // Create and write the secret key to file with appropriate permissions - create_with_600_perms( - &sk_path, - serde_utils::hex::encode(sk.serialize()).as_bytes(), - ) - .map_err(|e| { - format!( - "Unable to create file with permissions for {:?}: {:?}", - sk_path, e - ) - })?; + if !pk_path.exists() { + let length = PK_LEN; + let pk: String = thread_rng() + .sample_iter(&Alphanumeric) + .take(length) + .map(char::from) + .collect(); // Create and write the public key to file with appropriate permissions - create_with_600_perms( - &pk_path, - format!( - "{}{}", - PK_PREFIX, - serde_utils::hex::encode(&pk.serialize_compressed()[..]) - ) - .as_bytes(), - ) - .map_err(|e| { + create_with_600_perms(&pk_path, pk.to_string().as_bytes()).map_err(|e| { format!( "Unable to create file with permissions for {:?}: {:?}", pk_path, e @@ -87,78 +55,18 @@ impl ApiSecret { })?; } - let sk = fs::read(&sk_path) - .map_err(|e| format!("cannot read {}: {}", SK_FILENAME, e)) - .and_then(|bytes| { - serde_utils::hex::decode(&String::from_utf8_lossy(&bytes)) - .map_err(|_| format!("{} should be 0x-prefixed hex", PK_FILENAME)) - }) - .and_then(|bytes| { - if bytes.len() == SK_LEN { - let mut array = [0; SK_LEN]; - array.copy_from_slice(&bytes); - SecretKey::parse(&array).map_err(|e| format!("invalid {}: {}", SK_FILENAME, e)) - } else { - Err(format!( - "{} expected {} bytes not {}", - SK_FILENAME, - SK_LEN, - bytes.len() - )) - } - })?; - let pk = fs::read(&pk_path) - .map_err(|e| format!("cannot read {}: {}", PK_FILENAME, e)) - .and_then(|bytes| { - let hex = - String::from_utf8(bytes).map_err(|_| format!("{} is not utf8", SK_FILENAME))?; - if let Some(stripped) = hex.strip_prefix(PK_PREFIX) { - serde_utils::hex::decode(stripped) - .map_err(|_| format!("{} should be 0x-prefixed hex", SK_FILENAME)) - } else { - Err(format!("unable to parse {}", SK_FILENAME)) - } - }) - .and_then(|bytes| { - if bytes.len() == PK_LEN { - let mut array = [0; PK_LEN]; - array.copy_from_slice(&bytes); - PublicKey::parse_compressed(&array) - .map_err(|e| format!("invalid {}: {}", PK_FILENAME, e)) - } else { - Err(format!( - "{} expected {} bytes not {}", - PK_FILENAME, - PK_LEN, - bytes.len() - )) - } - })?; + .map_err(|e| format!("cannot read {}: {}", PK_FILENAME, e))? + .iter() + .map(|&c| char::from(c)) + .collect(); - // Ensure that the keys loaded from disk are indeed a pair. - if PublicKey::from_secret_key(&sk) != pk { - fs::remove_file(&sk_path) - .map_err(|e| format!("unable to remove {}: {}", SK_FILENAME, e))?; - fs::remove_file(&pk_path) - .map_err(|e| format!("unable to remove {}: {}", PK_FILENAME, e))?; - return Err(format!( - "{:?} does not match {:?} and the files have been deleted. Please try again.", - sk_path, pk_path - )); - } - - Ok(Self { pk, sk, pk_path }) - } - - /// Returns the public key of `self` as a 0x-prefixed hex string. - fn pubkey_string(&self) -> String { - serde_utils::hex::encode(&self.pk.serialize_compressed()[..]) + Ok(Self { pk, pk_path }) } /// Returns the API token. pub fn api_token(&self) -> String { - format!("{}{}", PK_PREFIX, self.pubkey_string()) + self.pk.clone() } /// Returns the path for the API token file @@ -196,16 +104,4 @@ impl ApiSecret { .untuple_one() .boxed() } - - /// Returns a closure which produces a signature over some bytes using the secret key in - /// `self`. The signature is a 32-byte hash formatted as a 0x-prefixed string. - pub fn signer(&self) -> impl Fn(&[u8]) -> String + Clone { - let sk = self.sk; - move |input: &[u8]| -> String { - let message = - Message::parse_slice(digest(&SHA256, input).as_ref()).expect("sha256 is 32 bytes"); - let (signature, _) = libsecp256k1::sign(&message, &sk); - serde_utils::hex::encode(signature.serialize_der().as_ref()) - } - } } diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index a4480195e5..3d7cab8e5e 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -45,15 +45,8 @@ use task_executor::TaskExecutor; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ChainSpec, ConfigAndPreset, EthSpec}; use validator_dir::Builder as ValidatorDirBuilder; -use warp::{ - http::{ - header::{HeaderValue, CONTENT_TYPE}, - response::Response, - StatusCode, - }, - sse::Event, - Filter, -}; +use warp::{sse::Event, Filter}; +use warp_utils::task::blocking_json_task; #[derive(Debug)] pub enum Error { @@ -176,9 +169,6 @@ pub fn serve( } }; - let signer = ctx.api_secret.signer(); - let signer = warp::any().map(move || signer.clone()); - let inner_validator_store = ctx.validator_store.clone(); let validator_store_filter = warp::any() .map(move || inner_validator_store.clone()) @@ -270,9 +260,8 @@ pub fn serve( let get_node_version = warp::path("lighthouse") .and(warp::path("version")) .and(warp::path::end()) - .and(signer.clone()) - .and_then(|signer| { - blocking_signed_json_task(signer, move || { + .then(|| { + blocking_json_task(move || { Ok(api_types::GenericResponse::from(api_types::VersionData { version: version_with_platform(), })) @@ -283,9 +272,8 @@ pub fn serve( let get_lighthouse_health = warp::path("lighthouse") .and(warp::path("health")) .and(warp::path::end()) - .and(signer.clone()) - .and_then(|signer| { - blocking_signed_json_task(signer, move || { + .then(|| { + blocking_json_task(move || { eth2::lighthouse::Health::observe() .map(api_types::GenericResponse::from) .map_err(warp_utils::reject::custom_bad_request) @@ -297,9 +285,8 @@ pub fn serve( .and(warp::path("spec")) .and(warp::path::end()) .and(spec_filter.clone()) - .and(signer.clone()) - .and_then(|spec: Arc<_>, signer| { - blocking_signed_json_task(signer, move || { + .then(|spec: Arc<_>| { + blocking_json_task(move || { let config = ConfigAndPreset::from_chain_spec::(&spec, None); Ok(api_types::GenericResponse::from(config)) }) @@ -310,9 +297,8 @@ pub fn serve( .and(warp::path("validators")) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then(|validator_store: Arc>, signer| { - blocking_signed_json_task(signer, move || { + .then(|validator_store: Arc>| { + blocking_json_task(move || { let validators = validator_store .initialized_validators() .read() @@ -335,10 +321,9 @@ pub fn serve( .and(warp::path::param::()) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( - |validator_pubkey: PublicKey, validator_store: Arc>, signer| { - blocking_signed_json_task(signer, move || { + .then( + |validator_pubkey: PublicKey, validator_store: Arc>| { + blocking_json_task(move || { let validator = validator_store .initialized_validators() .read() @@ -370,9 +355,8 @@ pub fn serve( .and(system_info_filter) .and(app_start_filter) .and(validator_dir_filter.clone()) - .and(signer.clone()) - .and_then(|sysinfo, app_start: std::time::Instant, val_dir, signer| { - blocking_signed_json_task(signer, move || { + .then(|sysinfo, app_start: std::time::Instant, val_dir| { + blocking_json_task(move || { let app_uptime = app_start.elapsed().as_secs(); Ok(api_types::GenericResponse::from(observe_system_health_vc( sysinfo, val_dir, app_uptime, @@ -387,15 +371,13 @@ pub fn serve( .and(validator_store_filter.clone()) .and(graffiti_file_filter.clone()) .and(graffiti_flag_filter) - .and(signer.clone()) .and(log_filter.clone()) - .and_then( + .then( |validator_store: Arc>, graffiti_file: Option, graffiti_flag: Option, - signer, log| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { let mut result = HashMap::new(); for (key, graffiti_definition) in validator_store .initialized_validators() @@ -425,17 +407,15 @@ pub fn serve( .and(secrets_dir_filter.clone()) .and(validator_store_filter.clone()) .and(spec_filter.clone()) - .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( move |body: Vec, validator_dir: PathBuf, secrets_dir: PathBuf, validator_store: Arc>, spec: Arc, - signer, task_executor: TaskExecutor| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir); if let Some(handle) = task_executor.handle() { let (validators, mnemonic) = @@ -472,17 +452,15 @@ pub fn serve( .and(secrets_dir_filter.clone()) .and(validator_store_filter.clone()) .and(spec_filter) - .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( move |body: api_types::CreateValidatorsMnemonicRequest, validator_dir: PathBuf, secrets_dir: PathBuf, validator_store: Arc>, spec: Arc, - signer, task_executor: TaskExecutor| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir); if let Some(handle) = task_executor.handle() { let mnemonic = @@ -521,16 +499,14 @@ pub fn serve( .and(validator_dir_filter.clone()) .and(secrets_dir_filter.clone()) .and(validator_store_filter.clone()) - .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( move |body: api_types::KeystoreValidatorsPostRequest, validator_dir: PathBuf, secrets_dir: PathBuf, validator_store: Arc>, - signer, task_executor: TaskExecutor| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { // Check to ensure the password is correct. let keypair = body .keystore @@ -611,14 +587,12 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(validator_store_filter.clone()) - .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( |body: Vec, validator_store: Arc>, - signer, task_executor: TaskExecutor| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { if let Some(handle) = task_executor.handle() { let web3signers: Vec = body .into_iter() @@ -666,16 +640,14 @@ pub fn serve( .and(warp::body::json()) .and(validator_store_filter.clone()) .and(graffiti_file_filter.clone()) - .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, body: api_types::ValidatorPatchRequest, validator_store: Arc>, graffiti_file: Option, - signer, task_executor: TaskExecutor| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { if body.graffiti.is_some() && graffiti_file.is_some() { return Err(warp_utils::reject::custom_bad_request( "Unable to update graffiti as the \"--graffiti-file\" flag is set" @@ -784,10 +756,9 @@ pub fn serve( // GET /lighthouse/auth let get_auth = warp::path("lighthouse").and(warp::path("auth").and(warp::path::end())); let get_auth = get_auth - .and(signer.clone()) .and(api_token_path_filter) - .and_then(|signer, token_path: PathBuf| { - blocking_signed_json_task(signer, move || { + .then(move |token_path: PathBuf| { + blocking_json_task(move || { Ok(AuthResponse { token_path: token_path.display().to_string(), }) @@ -799,23 +770,20 @@ pub fn serve( .and(warp::path("keystores")) .and(warp::path::end()) .and(warp::body::json()) - .and(signer.clone()) .and(validator_store_filter.clone()) .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then( - move |request, signer, validator_store, task_executor, log| { - blocking_signed_json_task(signer, move || { - if allow_keystore_export { - keystores::export(request, validator_store, task_executor, log) - } else { - Err(warp_utils::reject::custom_bad_request( - "keystore export is disabled".to_string(), - )) - } - }) - }, - ); + .then(move |request, validator_store, task_executor, log| { + blocking_json_task(move || { + if allow_keystore_export { + keystores::export(request, validator_store, task_executor, log) + } else { + Err(warp_utils::reject::custom_bad_request( + "keystore export is disabled".to_string(), + )) + } + }) + }); // Standard key-manager endpoints. let eth_v1 = warp::path("eth").and(warp::path("v1")); @@ -829,10 +797,9 @@ pub fn serve( .and(warp::path("feerecipient")) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( - |validator_pubkey: PublicKey, validator_store: Arc>, signer| { - blocking_signed_json_task(signer, move || { + .then( + |validator_pubkey: PublicKey, validator_store: Arc>| { + blocking_json_task(move || { if validator_store .initialized_validators() .read() @@ -869,13 +836,11 @@ pub fn serve( .and(warp::body::json()) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, request: api_types::UpdateFeeRecipientRequest, - validator_store: Arc>, - signer| { - blocking_signed_json_task(signer, move || { + validator_store: Arc>| { + blocking_json_task(move || { if validator_store .initialized_validators() .read() @@ -909,10 +874,9 @@ pub fn serve( .and(warp::path("feerecipient")) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( - |validator_pubkey: PublicKey, validator_store: Arc>, signer| { - blocking_signed_json_task(signer, move || { + .then( + |validator_pubkey: PublicKey, validator_store: Arc>| { + blocking_json_task(move || { if validator_store .initialized_validators() .read() @@ -946,10 +910,9 @@ pub fn serve( .and(warp::path("gas_limit")) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( - |validator_pubkey: PublicKey, validator_store: Arc>, signer| { - blocking_signed_json_task(signer, move || { + .then( + |validator_pubkey: PublicKey, validator_store: Arc>| { + blocking_json_task(move || { if validator_store .initialized_validators() .read() @@ -978,13 +941,11 @@ pub fn serve( .and(warp::body::json()) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, request: api_types::UpdateGasLimitRequest, - validator_store: Arc>, - signer| { - blocking_signed_json_task(signer, move || { + validator_store: Arc>| { + blocking_json_task(move || { if validator_store .initialized_validators() .read() @@ -1018,10 +979,9 @@ pub fn serve( .and(warp::path("gas_limit")) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( - |validator_pubkey: PublicKey, validator_store: Arc>, signer| { - blocking_signed_json_task(signer, move || { + .then( + |validator_pubkey: PublicKey, validator_store: Arc>| { + blocking_json_task(move || { if validator_store .initialized_validators() .read() @@ -1058,17 +1018,15 @@ pub fn serve( .and(validator_store_filter.clone()) .and(slot_clock_filter) .and(log_filter.clone()) - .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( |pubkey: PublicKey, query: api_types::VoluntaryExitQuery, validator_store: Arc>, slot_clock: T, log, - signer, task_executor: TaskExecutor| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { if let Some(handle) = task_executor.handle() { let signed_voluntary_exit = handle.block_on(create_signed_voluntary_exit( @@ -1096,13 +1054,11 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(graffiti_flag_filter) - .and(signer.clone()) - .and_then( + .then( |pubkey: PublicKey, validator_store: Arc>, - graffiti_flag: Option, - signer| { - blocking_signed_json_task(signer, move || { + graffiti_flag: Option| { + blocking_json_task(move || { let graffiti = get_graffiti(pubkey.clone(), validator_store, graffiti_flag)?; Ok(GenericResponse::from(GetGraffitiResponse { pubkey: pubkey.into(), @@ -1121,14 +1077,12 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(graffiti_file_filter.clone()) - .and(signer.clone()) - .and_then( + .then( |pubkey: PublicKey, query: SetGraffitiRequest, validator_store: Arc>, - graffiti_file: Option, - signer| { - blocking_signed_json_task(signer, move || { + graffiti_file: Option| { + blocking_json_task(move || { if graffiti_file.is_some() { return Err(warp_utils::reject::invalid_auth( "Unable to update graffiti as the \"--graffiti-file\" flag is set" @@ -1149,13 +1103,11 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(graffiti_file_filter.clone()) - .and(signer.clone()) - .and_then( + .then( |pubkey: PublicKey, validator_store: Arc>, - graffiti_file: Option, - signer| { - blocking_signed_json_task(signer, move || { + graffiti_file: Option| { + blocking_json_task(move || { if graffiti_file.is_some() { return Err(warp_utils::reject::invalid_auth( "Unable to delete graffiti as the \"--graffiti-file\" flag is set" @@ -1169,32 +1121,24 @@ pub fn serve( .map(|reply| warp::reply::with_status(reply, warp::http::StatusCode::NO_CONTENT)); // GET /eth/v1/keystores - let get_std_keystores = std_keystores - .and(signer.clone()) - .and(validator_store_filter.clone()) - .and_then(|signer, validator_store: Arc>| { - blocking_signed_json_task(signer, move || Ok(keystores::list(validator_store))) - }); + let get_std_keystores = std_keystores.and(validator_store_filter.clone()).then( + |validator_store: Arc>| { + blocking_json_task(move || Ok(keystores::list(validator_store))) + }, + ); // POST /eth/v1/keystores let post_std_keystores = std_keystores .and(warp::body::json()) - .and(signer.clone()) .and(validator_dir_filter) .and(secrets_dir_filter) .and(validator_store_filter.clone()) .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then( - move |request, - signer, - validator_dir, - secrets_dir, - validator_store, - task_executor, - log| { + .then( + move |request, validator_dir, secrets_dir, validator_store, task_executor, log| { let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir); - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { keystores::import( request, validator_dir, @@ -1210,33 +1154,30 @@ pub fn serve( // DELETE /eth/v1/keystores let delete_std_keystores = std_keystores .and(warp::body::json()) - .and(signer.clone()) .and(validator_store_filter.clone()) .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, task_executor, log| { - blocking_signed_json_task(signer, move || { + .then(|request, validator_store, task_executor, log| { + blocking_json_task(move || { keystores::delete(request, validator_store, task_executor, log) }) }); // GET /eth/v1/remotekeys - let get_std_remotekeys = std_remotekeys - .and(signer.clone()) - .and(validator_store_filter.clone()) - .and_then(|signer, validator_store: Arc>| { - blocking_signed_json_task(signer, move || Ok(remotekeys::list(validator_store))) - }); + let get_std_remotekeys = std_remotekeys.and(validator_store_filter.clone()).then( + |validator_store: Arc>| { + blocking_json_task(move || Ok(remotekeys::list(validator_store))) + }, + ); // POST /eth/v1/remotekeys let post_std_remotekeys = std_remotekeys .and(warp::body::json()) - .and(signer.clone()) .and(validator_store_filter.clone()) .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, task_executor, log| { - blocking_signed_json_task(signer, move || { + .then(|request, validator_store, task_executor, log| { + blocking_json_task(move || { remotekeys::import(request, validator_store, task_executor, log) }) }); @@ -1244,12 +1185,11 @@ pub fn serve( // DELETE /eth/v1/remotekeys let delete_std_remotekeys = std_remotekeys .and(warp::body::json()) - .and(signer) .and(validator_store_filter) .and(task_executor_filter) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, task_executor, log| { - blocking_signed_json_task(signer, move || { + .then(|request, validator_store, task_executor, log| { + blocking_json_task(move || { remotekeys::delete(request, validator_store, task_executor, log) }) }); @@ -1369,42 +1309,3 @@ pub fn serve( Ok((listening_socket, server)) } - -/// Executes `func` in blocking tokio task (i.e., where long-running tasks are permitted). -/// JSON-encodes the return value of `func`, using the `signer` function to produce a signature of -/// those bytes. -pub async fn blocking_signed_json_task( - signer: S, - func: F, -) -> Result -where - S: Fn(&[u8]) -> String, - F: FnOnce() -> Result + Send + 'static, - T: Serialize + Send + 'static, -{ - warp_utils::task::blocking_task(func) - .await - .map(|func_output| { - let mut response = match serde_json::to_vec(&func_output) { - Ok(body) => { - let mut res = Response::new(body); - res.headers_mut() - .insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); - res - } - Err(_) => Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(vec![]) - .expect("can produce simple response from static values"), - }; - - let body: &Vec = response.body(); - let signature = signer(body); - let header_value = - HeaderValue::from_str(&signature).expect("hash can be encoded as header"); - - response.headers_mut().append("Signature", header_value); - - response - }) -}