diff --git a/.github/mergify.yml b/.github/mergify.yml index 73267904b8..4ab73bcf07 100644 --- a/.github/mergify.yml +++ b/.github/mergify.yml @@ -1,8 +1,10 @@ pull_request_rules: - name: Ask to resolve conflict conditions: + - -closed - conflict - -author=dependabot[bot] + - label=ready-for-review - or: - -draft # Don't report conflicts on regular draft. - and: # Do report conflicts on draft that are scheduled for the next major release. @@ -12,6 +14,64 @@ pull_request_rules: comment: message: This pull request has merge conflicts. Could you please resolve them @{{author}}? 🙏 + label: + add: + - waiting-on-author + remove: + - ready-for-review + + - name: Ask to resolve CI failures + conditions: + - -closed + - label=ready-for-review + - or: + - check-skipped=test-suite-success + - check-skipped=local-testnet-success + - check-failure=test-suite-success + - check-failure=local-testnet-success + actions: + comment: + message: Some required checks have failed. Could you please take a look @{{author}}? 🙏 + label: + add: + - waiting-on-author + remove: + - ready-for-review + + - name: Update labels when PR is unblocked + conditions: + - -closed + - -draft + - label=waiting-on-author + - -conflict + # Unfortunately, it doesn't look like there's an easy way to check for PRs pending + # CI workflows approvals. + - check-success=test-suite-success + - check-success=local-testnet-success + # Update the label only if there are no more change requests from any reviewers and no unresolved threads. + # This rule ensures that a PR with passing CI can be marked as `waiting-on-author`. + - "#changes-requested-reviews-by = 0" + - "#review-threads-unresolved = 0" + actions: + label: + remove: + - waiting-on-author + add: + - ready-for-review + + - name: Close stale pull request after 30 days of inactivity + conditions: + - -closed + - label=waiting-on-author + - updated-at<=30 days ago + actions: + close: + message: > + Hi @{{author}}, this pull request has been closed automatically due to 30 days of inactivity. + If you’d like to continue working on it, feel free to reopen at any time. + label: + add: + - stale - name: Approve trivial maintainer PRs conditions: diff --git a/Cargo.lock b/Cargo.lock index 37cb553bed..af5d63e97f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -937,7 +937,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "7.1.0-beta.0" +version = "7.1.0" dependencies = [ "account_utils", "beacon_chain", @@ -1212,7 +1212,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "7.1.0-beta.0" +version = "7.1.0" dependencies = [ "beacon_node", "bytes", @@ -5050,7 +5050,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "7.1.0-beta.0" +version = "7.1.0" dependencies = [ "account_utils", "beacon_chain", @@ -5612,7 +5612,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "7.1.0-beta.0" +version = "7.1.0" dependencies = [ "account_manager", "account_utils", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 596419c33e..456376e79b 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "7.1.0-beta.0" +version = "7.1.0" authors = [ "Paul Hauner ", "Age Manning VerifiedLightClientFinalityUpdate { // verify that enough time has passed for the block to have been propagated let start_time = chain .slot_clock - .start_of(*rcv_finality_update.signature_slot()) + .start_of(rcv_finality_update.signature_slot()) .ok_or(Error::SigSlotStartIsNone)?; let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0); if seen_timestamp + chain.spec.maximum_gossip_clock_disparity() @@ -57,16 +82,76 @@ impl VerifiedLightClientFinalityUpdate { return Err(Error::TooEarly); } + if let Some(latest_broadcasted_finality_update) = chain + .light_client_server_cache + .get_latest_broadcasted_finality_update() + { + // Ignore the incoming finality update if we've already broadcasted it + if latest_broadcasted_finality_update == rcv_finality_update { + return Err(Error::Ignore); + } + + // Ignore the incoming finality update if the latest broadcasted attested header slot + // is greater than the incoming attested header slot. + if latest_broadcasted_finality_update.get_attested_header_slot() + > rcv_finality_update.get_attested_header_slot() + { + return Err(Error::Ignore); + } + } + let latest_finality_update = chain .light_client_server_cache .get_latest_finality_update() .ok_or(Error::FailedConstructingUpdate)?; - // verify that the gossiped finality update is the same as the locally constructed one. - if latest_finality_update != rcv_finality_update { - return Err(Error::InvalidLightClientFinalityUpdate); + // Ignore the incoming finality update if the latest constructed attested header slot + // is greater than the incoming attested header slot. + if latest_finality_update.get_attested_header_slot() + > rcv_finality_update.get_attested_header_slot() + { + return Err(Error::Ignore); } + // Verify that the gossiped finality update is the same as the locally constructed one. + if latest_finality_update != rcv_finality_update { + let signature_slot = latest_finality_update.signature_slot(); + if signature_slot != rcv_finality_update.signature_slot() { + return Err(Error::MismatchedSignatureSlot { + local: signature_slot, + observed: rcv_finality_update.signature_slot(), + }); + } + let local_finalized_header_root = latest_finality_update.get_finalized_header_root(); + let observed_finalized_header_root = rcv_finality_update.get_finalized_header_root(); + if local_finalized_header_root != observed_finalized_header_root { + return Err(Error::MismatchedFinalizedHeader { + local_finalized_header_root, + observed_finalized_header_root, + signature_slot, + }); + } + let local_attested_header_root = latest_finality_update.get_attested_header_root(); + let observed_attested_header_root = rcv_finality_update.get_attested_header_root(); + if local_attested_header_root != observed_attested_header_root { + return Err(Error::MismatchedAttestedHeader { + local_attested_header_root, + observed_attested_header_root, + finalized_header_root: local_finalized_header_root, + signature_slot, + }); + } + return Err(Error::MismatchedProofOrSyncAggregate { + attested_header_root: local_attested_header_root, + finalized_header_root: local_finalized_header_root, + signature_slot, + }); + } + + chain + .light_client_server_cache + .set_latest_broadcasted_finality_update(rcv_finality_update.clone()); + Ok(Self { light_client_finality_update: rcv_finality_update, seen_timestamp, diff --git a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs index 5665adc3ed..4da6913443 100644 --- a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs +++ b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs @@ -4,7 +4,7 @@ use eth2::types::Hash256; use slot_clock::SlotClock; use std::time::Duration; use strum::AsRefStr; -use types::LightClientOptimisticUpdate; +use types::{LightClientOptimisticUpdate, Slot}; /// Returned when a light client optimistic update was not successfully verified. It might not have been verified for /// two reasons: @@ -22,14 +22,30 @@ pub enum Error { /// /// Assuming the local clock is correct, the peer has sent an invalid message. TooEarly, - /// Light client optimistic update message does not match the locally constructed one. - InvalidLightClientOptimisticUpdate, + /// Light client optimistic update message does not match the locally constructed one, it has a + /// different signature slot. + MismatchedSignatureSlot { local: Slot, observed: Slot }, + /// Light client optimistic update message does not match the locally constructed one, it has a + /// different block header at the same slot. + MismatchedAttestedHeader { + local_attested_header_root: Hash256, + observed_attested_header_root: Hash256, + signature_slot: Slot, + }, + /// Light client optimistic update message does not match the locally constructed one, it has a + /// different sync aggregate for the same slot and attested header. + MismatchedSyncAggregate { + attested_header_root: Hash256, + signature_slot: Slot, + }, /// Signature slot start time is none. SigSlotStartIsNone, /// Failed to construct a LightClientOptimisticUpdate from state. FailedConstructingUpdate, /// Unknown block with parent root. UnknownBlockParentRoot(Hash256), + /// Silently ignore this light client optimistic update + Ignore, } /// Wraps a `LightClientOptimisticUpdate` that has been verified for propagation on the gossip network. @@ -52,7 +68,7 @@ impl VerifiedLightClientOptimisticUpdate { // verify that enough time has passed for the block to have been propagated let start_time = chain .slot_clock - .start_of(*rcv_optimistic_update.signature_slot()) + .start_of(rcv_optimistic_update.signature_slot()) .ok_or(Error::SigSlotStartIsNone)?; let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0); if seen_timestamp + chain.spec.maximum_gossip_clock_disparity() @@ -61,6 +77,22 @@ impl VerifiedLightClientOptimisticUpdate { return Err(Error::TooEarly); } + if let Some(latest_broadcasted_optimistic_update) = chain + .light_client_server_cache + .get_latest_broadcasted_optimistic_update() + { + // Ignore the incoming optimistic update if we've already broadcasted it + if latest_broadcasted_optimistic_update == rcv_optimistic_update { + return Err(Error::Ignore); + } + + // Ignore the incoming optimistic update if the latest broadcasted slot + // is greater than the incoming slot. + if latest_broadcasted_optimistic_update.get_slot() > rcv_optimistic_update.get_slot() { + return Err(Error::Ignore); + } + } + let head = chain.canonical_head.cached_head(); let head_block = &head.snapshot.beacon_block; // check if we can process the optimistic update immediately @@ -76,11 +108,40 @@ impl VerifiedLightClientOptimisticUpdate { .get_latest_optimistic_update() .ok_or(Error::FailedConstructingUpdate)?; - // verify that the gossiped optimistic update is the same as the locally constructed one. - if latest_optimistic_update != rcv_optimistic_update { - return Err(Error::InvalidLightClientOptimisticUpdate); + // Ignore the incoming optimistic update if the latest constructed slot + // is greater than the incoming slot. + if latest_optimistic_update.get_slot() > rcv_optimistic_update.get_slot() { + return Err(Error::Ignore); } + // Verify that the gossiped optimistic update is the same as the locally constructed one. + if latest_optimistic_update != rcv_optimistic_update { + let signature_slot = latest_optimistic_update.signature_slot(); + if signature_slot != rcv_optimistic_update.signature_slot() { + return Err(Error::MismatchedSignatureSlot { + local: signature_slot, + observed: rcv_optimistic_update.signature_slot(), + }); + } + let local_attested_header_root = latest_optimistic_update.get_canonical_root(); + let observed_attested_header_root = rcv_optimistic_update.get_canonical_root(); + if local_attested_header_root != observed_attested_header_root { + return Err(Error::MismatchedAttestedHeader { + local_attested_header_root, + observed_attested_header_root, + signature_slot, + }); + } + return Err(Error::MismatchedSyncAggregate { + attested_header_root: local_attested_header_root, + signature_slot, + }); + } + + chain + .light_client_server_cache + .set_latest_broadcasted_optimistic_update(rcv_optimistic_update.clone()); + let parent_root = rcv_optimistic_update.get_parent_root(); Ok(Self { light_client_optimistic_update: rcv_optimistic_update, diff --git a/beacon_node/beacon_chain/src/light_client_server_cache.rs b/beacon_node/beacon_chain/src/light_client_server_cache.rs index 3099c451c0..22122ee554 100644 --- a/beacon_node/beacon_chain/src/light_client_server_cache.rs +++ b/beacon_node/beacon_chain/src/light_client_server_cache.rs @@ -40,6 +40,10 @@ pub struct LightClientServerCache { latest_written_current_sync_committee: RwLock>>>, /// Caches state proofs by block root prev_block_cache: Mutex>>, + /// Tracks the latest broadcasted finality update + latest_broadcasted_finality_update: RwLock>>, + /// Tracks the latest broadcasted optimistic update + latest_broadcasted_optimistic_update: RwLock>>, } impl LightClientServerCache { @@ -49,6 +53,8 @@ impl LightClientServerCache { latest_optimistic_update: None.into(), latest_light_client_update: None.into(), latest_written_current_sync_committee: None.into(), + latest_broadcasted_finality_update: None.into(), + latest_broadcasted_optimistic_update: None.into(), prev_block_cache: lru::LruCache::new(PREV_BLOCK_CACHE_SIZE).into(), } } @@ -334,10 +340,89 @@ impl LightClientServerCache { Ok(new_value) } + /// Checks if we've already broadcasted the latest finality update. + /// If we haven't, update the `latest_broadcasted_finality_update` cache + /// and return the latest finality update for broadcasting, else return `None`. + pub fn should_broadcast_latest_finality_update( + &self, + ) -> Option> { + if let Some(latest_finality_update) = self.get_latest_finality_update() { + let latest_broadcasted_finality_update = self.get_latest_broadcasted_finality_update(); + match latest_broadcasted_finality_update { + Some(latest_broadcasted_finality_update) => { + if latest_broadcasted_finality_update != latest_finality_update { + self.set_latest_broadcasted_finality_update(latest_finality_update.clone()); + return Some(latest_finality_update); + } + } + None => { + self.set_latest_broadcasted_finality_update(latest_finality_update.clone()); + return Some(latest_finality_update); + } + } + } + + None + } + pub fn get_latest_finality_update(&self) -> Option> { self.latest_finality_update.read().clone() } + pub fn get_latest_broadcasted_optimistic_update( + &self, + ) -> Option> { + self.latest_broadcasted_optimistic_update.read().clone() + } + + pub fn get_latest_broadcasted_finality_update( + &self, + ) -> Option> { + self.latest_broadcasted_finality_update.read().clone() + } + + pub fn set_latest_broadcasted_optimistic_update( + &self, + optimistic_update: LightClientOptimisticUpdate, + ) { + *self.latest_broadcasted_optimistic_update.write() = Some(optimistic_update.clone()); + } + + pub fn set_latest_broadcasted_finality_update( + &self, + finality_update: LightClientFinalityUpdate, + ) { + *self.latest_broadcasted_finality_update.write() = Some(finality_update.clone()); + } + + /// Checks if we've already broadcasted the latest optimistic update. + /// If we haven't, update the `latest_broadcasted_optimistic_update` cache + /// and return the latest optimistic update for broadcasting, else return `None`. + pub fn should_broadcast_latest_optimistic_update( + &self, + ) -> Option> { + if let Some(latest_optimistic_update) = self.get_latest_optimistic_update() { + let latest_broadcasted_optimistic_update = + self.get_latest_broadcasted_optimistic_update(); + match latest_broadcasted_optimistic_update { + Some(latest_broadcasted_optimistic_update) => { + if latest_broadcasted_optimistic_update != latest_optimistic_update { + self.set_latest_broadcasted_optimistic_update( + latest_optimistic_update.clone(), + ); + return Some(latest_optimistic_update); + } + } + None => { + self.set_latest_broadcasted_optimistic_update(latest_optimistic_update.clone()); + return Some(latest_optimistic_update); + } + } + } + + None + } + pub fn get_latest_optimistic_update(&self) -> Option> { self.latest_optimistic_update.read().clone() } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index e9b2e8e6bf..2db93c0033 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2600,7 +2600,7 @@ pub fn serve( let fork_name = chain .spec - .fork_name_at_slot::(*update.signature_slot()); + .fork_name_at_slot::(update.signature_slot()); match accept_header { Some(api_types::Accept::Ssz) => Response::builder() .status(200) diff --git a/beacon_node/http_api/src/sync_committees.rs b/beacon_node/http_api/src/sync_committees.rs index aa126bbc82..57c74f8d01 100644 --- a/beacon_node/http_api/src/sync_committees.rs +++ b/beacon_node/http_api/src/sync_committees.rs @@ -320,6 +320,38 @@ pub fn process_signed_contribution_and_proofs( let seen_timestamp = timestamp_now(); + if let Some(latest_optimistic_update) = chain + .light_client_server_cache + .should_broadcast_latest_optimistic_update() + { + let _ = publish_pubsub_message( + &network_tx, + PubsubMessage::LightClientOptimisticUpdate(Box::new(latest_optimistic_update)), + ) + .inspect_err(|e| { + error!( + error = ?e, + "Unable to broadcast latest light client optimistic update" + ); + }); + }; + + if let Some(latest_finality_update) = chain + .light_client_server_cache + .should_broadcast_latest_finality_update() + { + let _ = publish_pubsub_message( + &network_tx, + PubsubMessage::LightClientFinalityUpdate(Box::new(latest_finality_update)), + ) + .inspect_err(|e| { + error!( + error = ?e, + "Unable to broadcast latest light client finality update" + ); + }); + }; + // Verify contributions & broadcast to the network. for (index, contribution) in signed_contribution_and_proofs.into_iter().enumerate() { let aggregator_index = contribution.message.aggregator_index; diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 0619908bb6..9807387a17 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -17,7 +17,7 @@ use std::marker::PhantomData; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; -use tracing::{debug, error, instrument, trace}; +use tracing::{debug, instrument, trace}; use types::{EthSpec, ForkContext}; pub(crate) use handler::{HandlerErr, HandlerEvent}; @@ -98,6 +98,13 @@ pub struct InboundRequestId { substream_id: SubstreamId, } +// An Active inbound request received via Rpc. +struct ActiveInboundRequest { + pub peer_id: PeerId, + pub request_type: RequestType, + pub peer_disconnected: bool, +} + impl InboundRequestId { /// Creates an _unchecked_ [`InboundRequestId`]. /// @@ -150,7 +157,7 @@ pub struct RPC { /// Rate limiter for our own requests. outbound_request_limiter: SelfRateLimiter, /// Active inbound requests that are awaiting a response. - active_inbound_requests: HashMap)>, + active_inbound_requests: HashMap>, /// Queue of events to be processed. events: Vec>, fork_context: Arc, @@ -199,8 +206,7 @@ impl RPC { } /// Sends an RPC response. - /// - /// The peer must be connected for this to succeed. + /// Returns an `Err` if the request does exist in the active inbound requests list. #[instrument(parent = None, level = "trace", fields(service = "libp2p_rpc"), @@ -209,14 +215,16 @@ impl RPC { )] pub fn send_response( &mut self, - peer_id: PeerId, request_id: InboundRequestId, response: RpcResponse, - ) { - let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id) + ) -> Result<(), RpcResponse> { + let Some(ActiveInboundRequest { + peer_id, + request_type, + peer_disconnected, + }) = self.active_inbound_requests.remove(&request_id) else { - error!(%peer_id, ?request_id, %response, "Request not found in active_inbound_requests. Response not sent"); - return; + return Err(response); }; // Add the request back to active requests if the response is `Success` and requires stream @@ -224,11 +232,24 @@ impl RPC { if request_type.protocol().terminator().is_some() && matches!(response, RpcResponse::Success(_)) { - self.active_inbound_requests - .insert(request_id, (peer_id, request_type.clone())); + self.active_inbound_requests.insert( + request_id, + ActiveInboundRequest { + peer_id, + request_type: request_type.clone(), + peer_disconnected, + }, + ); + } + + if peer_disconnected { + trace!(%peer_id, ?request_id, %response, + "Discarding response, peer is no longer connected"); + return Ok(()); } self.send_response_inner(peer_id, request_type.protocol(), request_id, response); + Ok(()) } fn send_response_inner( @@ -425,9 +446,10 @@ where self.events.push(error_msg); } - self.active_inbound_requests.retain( - |_inbound_request_id, (request_peer_id, _request_type)| *request_peer_id != peer_id, - ); + self.active_inbound_requests + .values_mut() + .filter(|request| request.peer_id == peer_id) + .for_each(|request| request.peer_disconnected = true); if let Some(limiter) = self.response_limiter.as_mut() { limiter.peer_disconnected(peer_id); @@ -468,9 +490,17 @@ where .active_inbound_requests .iter() .filter( - |(_inbound_request_id, (request_peer_id, active_request_type))| { + |( + _inbound_request_id, + ActiveInboundRequest { + peer_id: request_peer_id, + request_type: active_request_type, + peer_disconnected, + }, + )| { *request_peer_id == peer_id && active_request_type.protocol() == request_type.protocol() + && !peer_disconnected }, ) .count() @@ -494,19 +524,25 @@ where } // Requests that are below the limit on the number of simultaneous requests are added to the active inbound requests. - self.active_inbound_requests - .insert(request_id, (peer_id, request_type.clone())); + self.active_inbound_requests.insert( + request_id, + ActiveInboundRequest { + peer_id, + request_type: request_type.clone(), + peer_disconnected: false, + }, + ); // If we received a Ping, we queue a Pong response. if let RequestType::Ping(_) = request_type { trace!(connection_id = %connection_id, %peer_id, "Received Ping, queueing Pong"); self.send_response( - peer_id, request_id, RpcResponse::Success(RpcSuccessResponse::Pong(Ping { data: self.seq_number, })), - ); + ) + .expect("Request to exist"); } self.events.push(ToSwarm::GenerateEvent(RPCMessage { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index e2c6f24405..0f5745a3a2 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -11,8 +11,7 @@ use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY use crate::rpc::methods::MetadataRequest; use crate::rpc::{ GoodbyeReason, HandlerErr, InboundRequestId, NetworkParams, Protocol, RPCError, RPCMessage, - RPCReceived, RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, - RpcSuccessResponse, RPC, + RPCReceived, RequestType, ResponseTermination, RpcResponse, RpcSuccessResponse, RPC, }; use crate::types::{ all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash, @@ -39,7 +38,7 @@ use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use tracing::{debug, info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId, }; @@ -1146,35 +1145,22 @@ impl Network { name = "libp2p", skip_all )] - pub fn send_response( + pub fn send_response>>( &mut self, peer_id: PeerId, inbound_request_id: InboundRequestId, - response: Response, + response: T, ) { - self.eth2_rpc_mut() - .send_response(peer_id, inbound_request_id, response.into()) - } - - /// Inform the peer that their request produced an error. - #[instrument(parent = None, - level = "trace", - fields(service = "libp2p"), - name = "libp2p", - skip_all - )] - pub fn send_error_response( - &mut self, - peer_id: PeerId, - inbound_request_id: InboundRequestId, - error: RpcErrorResponse, - reason: String, - ) { - self.eth2_rpc_mut().send_response( - peer_id, - inbound_request_id, - RpcResponse::Error(error, reason.into()), - ) + if let Err(response) = self + .eth2_rpc_mut() + .send_response(inbound_request_id, response.into()) + { + if self.network_globals.peers.read().is_connected(&peer_id) { + error!(%peer_id, ?inbound_request_id, %response, + "Request not found in RPC active requests" + ); + } + } } /* Peer management functions */ @@ -1460,19 +1446,6 @@ impl Network { name = "libp2p", skip_all )] - fn send_meta_data_response( - &mut self, - _req: MetadataRequest, - inbound_request_id: InboundRequestId, - peer_id: PeerId, - ) { - let metadata = self.network_globals.local_metadata.read().clone(); - // The encoder is responsible for sending the negotiated version of the metadata - let event = RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata))); - self.eth2_rpc_mut() - .send_response(peer_id, inbound_request_id, event); - } - // RPC Propagation methods /// Queues the response to be sent upwards as long at it was requested outside the Behaviour. #[must_use = "return the response"] @@ -1760,9 +1733,13 @@ impl Network { self.peer_manager_mut().ping_request(&peer_id, ping.data); None } - RequestType::MetaData(req) => { + RequestType::MetaData(_req) => { // send the requested meta-data - self.send_meta_data_response(req, inbound_request_id, peer_id); + let metadata = self.network_globals.local_metadata.read().clone(); + // The encoder is responsible for sending the negotiated version of the metadata + let response = + RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata))); + self.send_response(peer_id, inbound_request_id, response); None } RequestType::Goodbye(reason) => { diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 6bdcd02197..0b17965f3c 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1967,7 +1967,10 @@ impl NetworkBeaconProcessor { Err(e) => { metrics::register_finality_update_error(&e); match e { - LightClientFinalityUpdateError::InvalidLightClientFinalityUpdate => { + LightClientFinalityUpdateError::MismatchedSignatureSlot { .. } + | LightClientFinalityUpdateError::MismatchedAttestedHeader { .. } + | LightClientFinalityUpdateError::MismatchedFinalizedHeader { .. } + | LightClientFinalityUpdateError::MismatchedProofOrSyncAggregate { .. } => { debug!( %peer_id, error = ?e, @@ -1999,6 +2002,7 @@ impl NetworkBeaconProcessor { error = ?e, "Light client error constructing finality update" ), + LightClientFinalityUpdateError::Ignore => {} } self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } @@ -2080,7 +2084,9 @@ impl NetworkBeaconProcessor { } return; } - LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => { + LightClientOptimisticUpdateError::MismatchedSignatureSlot { .. } + | LightClientOptimisticUpdateError::MismatchedAttestedHeader { .. } + | LightClientOptimisticUpdateError::MismatchedSyncAggregate { .. } => { metrics::register_optimistic_update_error(&e); debug!( @@ -2119,6 +2125,7 @@ impl NetworkBeaconProcessor { "Light client error constructing optimistic update" ) } + LightClientOptimisticUpdateError::Ignore => {} } self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 0a6d515232..89f71dc367 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -11,6 +11,7 @@ use futures::channel::mpsc::Sender; use futures::future::OptionFuture; use futures::prelude::*; +use lighthouse_network::rpc::methods::RpcResponse; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::RequestType; use lighthouse_network::service::Network; @@ -627,10 +628,11 @@ impl NetworkService { error, inbound_request_id, reason, - } => { - self.libp2p - .send_error_response(peer_id, inbound_request_id, error, reason); - } + } => self.libp2p.send_response( + peer_id, + inbound_request_id, + RpcResponse::Error(error, reason.into()), + ), NetworkMessage::ValidationResult { propagation_source, message_id, diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index cc49c43711..e3794bd2be 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -153,25 +153,21 @@ impl SyncingChain { } /// Check if the chain has peers from which to process batches. - #[instrument(parent = None,fields(chain = self.id , service = "range_sync"), skip_all)] pub fn available_peers(&self) -> usize { self.peers.len() } /// Get the chain's id. - #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] pub fn id(&self) -> ChainId { self.id } /// Peers currently syncing this chain. - #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] pub fn peers(&self) -> impl Iterator + '_ { self.peers.iter().cloned() } /// Progress in epochs made by the chain - #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] pub fn processed_epochs(&self) -> u64 { self.processing_target .saturating_sub(self.start_epoch) @@ -179,7 +175,6 @@ impl SyncingChain { } /// Returns the total count of pending blocks in all the batches of this chain - #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] pub fn pending_blocks(&self) -> usize { self.batches .values() @@ -189,7 +184,6 @@ impl SyncingChain { /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. - #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { self.peers.remove(peer_id); @@ -201,7 +195,6 @@ impl SyncingChain { } /// Returns the latest slot number that has been processed. - #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] fn current_processed_slot(&self) -> Slot { // the last slot we processed was included in the previous batch, and corresponds to the // first slot of the current target epoch @@ -959,7 +952,6 @@ impl SyncingChain { } /// Returns true if this chain is currently syncing. - #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] pub fn is_syncing(&self) -> bool { match self.state { ChainSyncingState::Syncing => true, @@ -1115,7 +1107,6 @@ impl SyncingChain { /// This produces a string of the form: [D,E,E,E,E] /// to indicate the current buffer state of the chain. The symbols are defined on each of the /// batch states. See [BatchState::visualize] for symbol definitions. - #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] fn visualize_batch_state(&self) -> String { let mut visualization_string = String::with_capacity((BATCH_BUFFER_SIZE * 3) as usize); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index f3f9aa97a2..eb27a03552 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -822,7 +822,7 @@ pub fn cli_app() -> Command { .long("state-cache-size") .value_name("STATE_CACHE_SIZE") .help("Specifies the size of the state cache") - .default_value("32") + .default_value("128") .action(ArgAction::Set) .display_order(0) ) diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 3c6339c03e..f55b91d58c 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -278,7 +278,7 @@ pub fn get_config( } if clap_utils::parse_optional::(cli_args, "eth1-cache-follow-distance")?.is_some() { - warn!("The eth1-purge-cache flag is deprecated"); + warn!("The eth1-cache-follow-distance flag is deprecated"); } // `--execution-endpoint` is required now. diff --git a/beacon_node/store/src/state_cache.rs b/beacon_node/store/src/state_cache.rs index b6aacbb77a..f8acd28f32 100644 --- a/beacon_node/store/src/state_cache.rs +++ b/beacon_node/store/src/state_cache.rs @@ -345,7 +345,9 @@ impl StateCache { let mut old_boundary_state_roots = vec![]; let mut good_boundary_state_roots = vec![]; - for (&state_root, (_, state)) in self.states.iter().skip(cull_exempt) { + // Skip the `cull_exempt` most-recently used, then reverse the iterator to start at + // least-recently used states. + for (&state_root, (_, state)) in self.states.iter().skip(cull_exempt).rev() { let is_advanced = state.slot() > state.latest_block_header().slot; let is_boundary = state.slot() % E::slots_per_epoch() == 0; let could_finalize = diff --git a/book/src/help_bn.md b/book/src/help_bn.md index b2d2af6cec..642add152e 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -381,7 +381,7 @@ Options: Minimum number of states to cull from the state cache when it gets full [default: 1] --state-cache-size - Specifies the size of the state cache [default: 32] + Specifies the size of the state cache [default: 128] --suggested-fee-recipient Emergency fallback fee recipient for use in case the validator client does not have one configured. You should set this flag on the diff --git a/boot_node/Cargo.toml b/boot_node/Cargo.toml index d1b059f3b2..07513d6ab2 100644 --- a/boot_node/Cargo.toml +++ b/boot_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "boot_node" -version = "7.1.0-beta.0" +version = "7.1.0" authors = ["Sigma Prime "] edition = { workspace = true } diff --git a/common/lighthouse_version/src/lib.rs b/common/lighthouse_version/src/lib.rs index b20708e7b0..238efd591a 100644 --- a/common/lighthouse_version/src/lib.rs +++ b/common/lighthouse_version/src/lib.rs @@ -17,8 +17,8 @@ pub const VERSION: &str = git_version!( // NOTE: using --match instead of --exclude for compatibility with old Git "--match=thiswillnevermatchlol" ], - prefix = "Lighthouse/v7.1.0-beta.0-", - fallback = "Lighthouse/v7.1.0-beta.0" + prefix = "Lighthouse/v7.1.0-", + fallback = "Lighthouse/v7.1.0" ); /// Returns the first eight characters of the latest commit hash for this build. @@ -54,7 +54,7 @@ pub fn version_with_platform() -> String { /// /// `1.5.1` pub fn version() -> &'static str { - "7.1.0-beta.0" + "7.1.0" } /// Returns the name of the current client running. diff --git a/consensus/types/src/light_client_finality_update.rs b/consensus/types/src/light_client_finality_update.rs index 9189dcd0a0..2125b4668b 100644 --- a/consensus/types/src/light_client_finality_update.rs +++ b/consensus/types/src/light_client_finality_update.rs @@ -79,6 +79,7 @@ pub struct LightClientFinalityUpdate { /// current sync aggregate pub sync_aggregate: SyncAggregate, /// Slot of the sync aggregated signature + #[superstruct(getter(copy))] pub signature_slot: Slot, } @@ -179,6 +180,20 @@ impl LightClientFinalityUpdate { }) } + pub fn get_attested_header_root<'a>(&'a self) -> Hash256 { + map_light_client_finality_update_ref!(&'a _, self.to_ref(), |inner, cons| { + cons(inner); + inner.attested_header.beacon.canonical_root() + }) + } + + pub fn get_finalized_header_root<'a>(&'a self) -> Hash256 { + map_light_client_finality_update_ref!(&'a _, self.to_ref(), |inner, cons| { + cons(inner); + inner.finalized_header.beacon.canonical_root() + }) + } + pub fn from_ssz_bytes(bytes: &[u8], fork_name: ForkName) -> Result { let finality_update = match fork_name { ForkName::Altair | ForkName::Bellatrix => { @@ -227,7 +242,7 @@ impl LightClientFinalityUpdate { if attested_slot > prev_slot { true } else { - attested_slot == prev_slot && signature_slot > *self.signature_slot() + attested_slot == prev_slot && signature_slot > self.signature_slot() } } } diff --git a/consensus/types/src/light_client_optimistic_update.rs b/consensus/types/src/light_client_optimistic_update.rs index 5701ebd875..13e308cd27 100644 --- a/consensus/types/src/light_client_optimistic_update.rs +++ b/consensus/types/src/light_client_optimistic_update.rs @@ -60,6 +60,7 @@ pub struct LightClientOptimisticUpdate { /// current sync aggregate pub sync_aggregate: SyncAggregate, /// Slot of the sync aggregated signature + #[superstruct(getter(copy))] pub signature_slot: Slot, } @@ -200,7 +201,7 @@ impl LightClientOptimisticUpdate { if attested_slot > prev_slot { true } else { - attested_slot == prev_slot && signature_slot > *self.signature_slot() + attested_slot == prev_slot && signature_slot > self.signature_slot() } } } diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index b39feb5011..a54c10dc68 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lcli" description = "Lighthouse CLI (modeled after zcli)" -version = "7.1.0-beta.0" +version = "7.1.0" authors = ["Paul Hauner "] edition = { workspace = true } diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index fdda1696b1..6a8fa00c1e 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lighthouse" -version = "7.1.0-beta.0" +version = "7.1.0" authors = ["Sigma Prime "] edition = { workspace = true } autotests = false diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index bbd8f764e7..10168d026f 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -126,6 +126,16 @@ fn main() { .global(true) .display_order(0), ) + .arg( + Arg::new("logfile") + .long("logfile") + .value_name("PATH") + .help("DEPRECATED") + .action(ArgAction::Set) + .global(true) + .hide(true) + .display_order(0) + ) .arg( Arg::new("logfile-dir") .long("logfile-dir") @@ -701,6 +711,11 @@ fn run( // Allow Prometheus access to the version and commit of the Lighthouse build. metrics::expose_lighthouse_version(); + // DEPRECATED: can be removed in v7.2.0/v8.0.0. + if clap_utils::parse_optional::(matches, "logfile")?.is_some() { + warn!("The --logfile flag is deprecated and replaced by --logfile-dir"); + } + #[cfg(all(feature = "modern", target_arch = "x86_64"))] if !std::is_x86_feature_detected!("adx") { tracing::warn!( diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 26b6c8ff0e..884e5eddeb 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1829,7 +1829,7 @@ fn block_cache_size_flag() { fn state_cache_size_default() { CommandLineTest::new() .run_with_zero_port() - .with_config(|config| assert_eq!(config.store.state_cache_size, new_non_zero_usize(32))); + .with_config(|config| assert_eq!(config.store.state_cache_size, new_non_zero_usize(128))); } #[test] fn state_cache_size_flag() { @@ -2476,6 +2476,13 @@ fn logfile_format_flag() { ) }); } +// DEPRECATED but should not crash. +#[test] +fn deprecated_logfile() { + CommandLineTest::new() + .flag("logfile", Some("test.txt")) + .run_with_zero_port(); +} // DEPRECATED but should not crash. #[test] diff --git a/testing/simulator/src/checks.rs b/testing/simulator/src/checks.rs index 1b2d4024d1..e7cc9b7a4e 100644 --- a/testing/simulator/src/checks.rs +++ b/testing/simulator/src/checks.rs @@ -303,7 +303,7 @@ pub(crate) async fn verify_light_client_updates( } // Verify light client optimistic update. `signature_slot_distance` should be 1 in the ideal scenario. - let signature_slot = *client + let signature_slot = client .get_beacon_light_client_optimistic_update::() .await .map_err(|e| format!("Error while getting light client updates: {:?}", e))? @@ -332,7 +332,7 @@ pub(crate) async fn verify_light_client_updates( } continue; } - let signature_slot = *client + let signature_slot = client .get_beacon_light_client_finality_update::() .await .map_err(|e| format!("Error while getting light client updates: {:?}", e))?