Compare commits

...

4 Commits

Author SHA1 Message Date
Michael Sproul
9e12c21f26 Release v5.2.1 (testing branch) (#5989)
* Release v5.2.1
2024-06-28 02:10:32 +00:00
Lion - dapplion
b38019cb10 Attempt to continue lookups after adding peers (#5993)
* Attempt to continue lookups after adding peers
2024-06-26 23:53:55 +00:00
Pawan Dhananjay
bf4cbd3b0a Remove all batches related to a peer on disconnect (#5969)
* Remove all batches related to a peer on disconnect

* Cleanup map entries after disconnect

* Allow lookups to continue in case of disconnections

* Pretty response types

* fmt

* Fix lints

* Remove lookup if it cannot progress

* Fix tests

* Remove poll_close on rpc behaviour

* Remove redundant test

* Fix issue raised by lion

* Revert pretty response types

* Cleanup

* Fix test

* Merge remote-tracking branch 'origin/release-v5.2.1' into rpc-error-on-disconnect-revert

* Apply suggestions from joao

Co-authored-by: João Oliveira <hello@jxs.pt>

* Fix log

* update request status on no peers found

* Do not remove lookup after peer disconnection

* Add comments about expected event api

* Update single_block_lookup.rs

* Update mod.rs

* Merge branch 'rpc-error-on-disconnect-revert' into 5969-review

* Merge pull request #10 from dapplion/5969-review

Add comments about expected event api
2024-06-26 23:53:53 +00:00
chonghe
758b58c9e9 Update slasher DB size and Lighthouse book (#5934)
* Update book

* Fix

* mdlint

* Revise

* Update slasher doc

* Revise max db size

* change blob to file

* Add checkpoint-blobs

* Thanks Jimmy for the command

* Update schema docs
2024-06-23 23:52:15 +00:00
24 changed files with 346 additions and 216 deletions

8
Cargo.lock generated
View File

@@ -855,7 +855,7 @@ dependencies = [
[[package]]
name = "beacon_node"
version = "5.2.0"
version = "5.2.1"
dependencies = [
"beacon_chain",
"clap",
@@ -1061,7 +1061,7 @@ dependencies = [
[[package]]
name = "boot_node"
version = "5.2.0"
version = "5.2.1"
dependencies = [
"beacon_node",
"clap",
@@ -4322,7 +4322,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lcli"
version = "5.2.0"
version = "5.2.1"
dependencies = [
"account_utils",
"beacon_chain",
@@ -4893,7 +4893,7 @@ dependencies = [
[[package]]
name = "lighthouse"
version = "5.2.0"
version = "5.2.1"
dependencies = [
"account_manager",
"account_utils",

View File

@@ -1,6 +1,6 @@
[package]
name = "beacon_node"
version = "5.2.0"
version = "5.2.1"
authors = [
"Paul Hauner <paul@paulhauner.com>",
"Age Manning <Age@AgeManning.com",

View File

@@ -352,37 +352,6 @@ where
!matches!(self.state, HandlerState::Deactivated)
}
// NOTE: This function gets polled to completion upon a connection close.
fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
// Inform the network behaviour of any failed requests
while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() {
let outbound_info = self
.outbound_substreams
.remove(&substream_id)
.expect("The value must exist for a key");
// If the state of the connection is closing, we do not need to report this case to
// the behaviour, as the connection has just closed non-gracefully
if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) {
continue;
}
// Register this request as an RPC Error
return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound {
error: RPCError::Disconnected,
proto: outbound_info.proto,
id: outbound_info.req_id,
})));
}
// Also handle any events that are awaiting to be sent to the behaviour
if !self.events_out.is_empty() {
return Poll::Ready(Some(self.events_out.remove(0)));
}
Poll::Ready(None)
}
fn poll(
&mut self,
cx: &mut Context<'_>,

View File

@@ -3,7 +3,7 @@
mod common;
use common::Protocol;
use lighthouse_network::rpc::{methods::*, RPCError};
use lighthouse_network::rpc::methods::*;
use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response};
use slog::{debug, warn, Level};
use ssz::Encode;
@@ -1012,98 +1012,6 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
})
}
#[test]
fn test_disconnect_triggers_rpc_error() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Debug;
let enable_logging = false;
let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();
let rt = Arc::new(Runtime::new().unwrap());
// get sender/receiver
rt.block_on(async {
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
&log,
ForkName::Base,
&spec,
Protocol::Tcp,
)
.await;
// BlocksByRoot Request
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new(
// Must have at least one root for the request to create a stream
vec![Hash256::from_low_u64_be(0)],
&spec,
));
// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.send_request(peer_id, 42, rpc_request.clone())
.unwrap();
}
NetworkEvent::RPCFailed { error, id: 42, .. } => match error {
RPCError::Disconnected => return,
other => panic!("received unexpected error {:?}", other),
},
other => {
warn!(log, "Ignoring other event {:?}", other);
}
}
}
};
// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
// messages
let mut sending_peer = None;
let receiver_future = async {
loop {
// this future either drives the sending/receiving or times out allowing messages to be
// sent in the timeout
match futures::future::select(
Box::pin(receiver.next_event()),
Box::pin(tokio::time::sleep(Duration::from_secs(1))),
)
.await
{
futures::future::Either::Left((ev, _)) => match ev {
NetworkEvent::RequestReceived { peer_id, .. } => {
sending_peer = Some(peer_id);
}
other => {
warn!(log, "Ignoring other event {:?}", other);
}
},
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
}
// if we need to send messages send them here. This will happen after a delay
if let Some(peer_id) = sending_peer.take() {
warn!(log, "Receiver got request, disconnecting peer");
receiver.__hard_disconnect_testing_only(peer_id);
}
}
};
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = sleep(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
})
}
/// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC
/// Goodbye message.
fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) {

View File

@@ -307,7 +307,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// A peer has disconnected.
/// If the peer has active batches, those are considered failed and re-requested.
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> {
pub fn peer_disconnected(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> Result<(), BackFillError> {
if matches!(
self.state(),
BackFillState::Failed | BackFillState::NotRequired
@@ -315,7 +319,37 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
return Ok(());
}
self.active_requests.remove(peer_id);
if let Some(batch_ids) = self.active_requests.remove(peer_id) {
// fail the batches.
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
match batch.download_failed(false) {
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
self.fail_sync(BackFillError::BatchDownloadFailed(id))?;
}
Ok(BatchOperationOutcome::Continue) => {}
Err(e) => {
self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?;
}
}
// If we have run out of peers in which to retry this batch, the backfill state
// transitions to a paused state.
// We still need to reset the state for all the affected batches, so we should not
// short circuit early.
if self.retry_batch_download(network, id).is_err() {
debug!(
self.log,
"Batch could not be retried";
"batch_id" => id,
"error" => "no synced peers"
);
}
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)
}
}
}
// Remove the peer from the participation list
self.participating_peers.remove(peer_id);

View File

@@ -1,3 +1,25 @@
//! Implements block lookup sync.
//!
//! Block lookup sync is triggered when a peer claims to have imported a block we don't know about.
//! For example, a peer attesting to a head block root that is not in our fork-choice. Lookup sync
//! is recursive in nature, as we may discover that this attested head block root has a parent that
//! is also unknown to us.
//!
//! Block lookup is implemented as an event-driven state machine. It sends events to the network and
//! beacon processor, and expects some set of events back. A discrepancy in the expected event API
//! will result in lookups getting "stuck". A lookup becomes stuck when there is no future event
//! that will trigger the lookup to make progress. There's a fallback mechanism that drops lookups
//! that live for too long, logging the line "Notify the devs a sync lookup is stuck".
//!
//! The expected event API is documented in the code paths that are making assumptions with the
//! comment prefix "Lookup sync event safety:"
//!
//! Block lookup sync attempts to not re-download or re-process data that we already have. Block
//! components are cached temporarily in multiple places before they are imported into fork-choice.
//! Therefore, block lookup sync must peek these caches correctly to decide when to skip a download
//! or consider a lookup complete. These caches are read from the `SyncNetworkContext` and its state
//! returned to this module as `LookupRequestResult` variants.
use self::parent_chain::{compute_parent_chains, NodeChain};
pub use self::single_block_lookup::DownloadResult;
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
@@ -269,7 +291,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers) {
if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) {
warn!(self.log, "Error adding peers to ancestor lookup"; "error" => ?e);
}
@@ -410,21 +432,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/* Error responses */
pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
self.single_block_lookups.retain(|_, lookup| {
for (_, lookup) in self.single_block_lookups.iter_mut() {
lookup.remove_peer(peer_id);
// Note: this condition should be removed in the future. It's not strictly necessary to drop a
// lookup if there are no peers left. Lookup should only be dropped if it can not make progress
if lookup.has_no_peers() {
debug!(self.log,
"Dropping single lookup after peer disconnection";
"block_root" => ?lookup.block_root()
);
false
} else {
true
}
});
}
}
/* Processing responses */
@@ -787,12 +797,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};
if stuck_lookup.id == ancestor_stuck_lookup.id {
warn!(self.log, "Notify the devs, a sync lookup is stuck";
warn!(self.log, "Notify the devs a sync lookup is stuck";
"block_root" => ?stuck_lookup.block_root(),
"lookup" => ?stuck_lookup,
);
} else {
warn!(self.log, "Notify the devs, a sync lookup is stuck";
warn!(self.log, "Notify the devs a sync lookup is stuck";
"block_root" => ?stuck_lookup.block_root(),
"lookup" => ?stuck_lookup,
"ancestor_block_root" => ?ancestor_stuck_lookup.block_root(),
@@ -834,14 +844,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
lookup_id: SingleLookupId,
peers: &[PeerId],
cx: &mut SyncNetworkContext<T>,
) -> Result<(), String> {
let lookup = self
.single_block_lookups
.get_mut(&lookup_id)
.ok_or(format!("Unknown lookup for id {lookup_id}"))?;
let mut added_some_peer = false;
for peer in peers {
if lookup.add_peer(*peer) {
added_some_peer = true;
debug!(self.log, "Adding peer to existing single block lookup";
"block_root" => ?lookup.block_root(),
"peer" => ?peer
@@ -849,22 +862,25 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
// We may choose to attempt to continue a lookup here. It is possible that a lookup had zero
// peers and after adding this set of peers it can make progress again. Note that this
// recursive function iterates from child to parent, so continuing the child first is weird.
// However, we choose to not attempt to continue the lookup for simplicity. It's not
// strictly required and just and optimization for a rare corner case.
if let Some(parent_root) = lookup.awaiting_parent() {
if let Some((&child_id, _)) = self
.single_block_lookups
.iter()
.find(|(_, l)| l.block_root() == parent_root)
{
self.add_peers_to_lookup_and_ancestors(child_id, peers)
self.add_peers_to_lookup_and_ancestors(child_id, peers, cx)
} else {
Err(format!("Lookup references unknown parent {parent_root:?}"))
}
} else if added_some_peer {
// If this lookup is not awaiting a parent and we added at least one peer, attempt to
// make progress. It is possible that a lookup is created with zero peers, attempted to
// make progress, and then receives peers. After that time the lookup will never be
// pruned with `drop_lookups_without_peers` because it has peers. This is rare corner
// case, but it can result in stuck lookups.
let result = lookup.continue_requests(cx);
self.on_lookup_result(lookup_id, result, "add_peers", cx);
Ok(())
} else {
Ok(())
}

View File

@@ -197,21 +197,36 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
let Some(peer_id) = self.use_rand_available_peer() else {
// Allow lookup to not have any peers. In that case do nothing. If the lookup does
// not have peers for some time, it will be dropped.
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
// attestations for said block.
// Lookup sync event safety: If a lookup requires peers to make progress, and does
// not receive any new peers for some time it will be dropped. If it receives a new
// peer it must attempt to make progress.
R::request_state_mut(self)
.get_state_mut()
.update_awaiting_download_status("no peers");
return Ok(());
};
let request = R::request_state_mut(self);
match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
LookupRequestResult::RequestSent(req_id) => {
// Lookup sync event safety: If make_request returns `RequestSent`, we are
// guaranteed that `BlockLookups::on_download_response` will be called exactly
// with this `req_id`.
request.get_state_mut().on_download_start(req_id)?
}
LookupRequestResult::NoRequestNeeded => {
// Lookup sync event safety: Advances this request to the terminal `Processed`
// state. If all requests reach this state, the request is marked as completed
// in `Self::continue_requests`.
request.get_state_mut().on_completed_request()?
}
// Sync will receive a future event to make progress on the request, do nothing now
LookupRequestResult::Pending(reason) => {
// Lookup sync event safety: Refer to the code paths constructing
// `LookupRequestResult::Pending`
request
.get_state_mut()
.update_awaiting_download_status(reason);
@@ -222,16 +237,28 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// Otherwise, attempt to progress awaiting processing
// If this request is awaiting a parent lookup to be processed, do not send for processing.
// The request will be rejected with unknown parent error.
//
// TODO: The condition `block_is_processed || Block` can be dropped after checking for
// unknown parent root when import RPC blobs
} else if !awaiting_parent
&& (block_is_processed || matches!(R::response_type(), ResponseType::Block))
{
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
// useful to conditionally access the result data.
if let Some(result) = request.get_state_mut().maybe_start_processing() {
// Lookup sync event safety: If `send_for_processing` returns Ok() we are guaranteed
// that `BlockLookups::on_processing_result` will be called exactly once with this
// lookup_id
return R::send_for_processing(id, result, cx);
}
// Lookup sync event safety: If the request is not in `AwaitingDownload` or
// `AwaitingProcessing` state it is guaranteed to receive some event to make progress.
}
// Lookup sync event safety: If a lookup is awaiting a parent we are guaranteed to either:
// (1) attempt to make progress with `BlockLookups::continue_child_lookups` if the parent
// lookup completes, or (2) get dropped if the parent fails and is dropped.
Ok(())
}
@@ -246,10 +273,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.peers.insert(peer_id)
}
/// Remove peer from available peers. Return true if there are no more available peers and all
/// requests are not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
self.peers.remove(peer_id)
/// Remove peer from available peers.
pub fn remove_peer(&mut self, peer_id: &PeerId) {
self.peers.remove(peer_id);
}
/// Returns true if this lookup has zero peers

View File

@@ -290,6 +290,7 @@ impl TestRig {
.0
}
#[track_caller]
fn expect_no_active_single_lookups(&self) {
assert!(
self.active_single_lookups().is_empty(),
@@ -298,6 +299,7 @@ impl TestRig {
);
}
#[track_caller]
fn expect_no_active_lookups(&self) {
self.expect_no_active_single_lookups();
}
@@ -539,10 +541,6 @@ impl TestRig {
})
}
fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id));
}
/// Return RPCErrors for all active requests of peer
fn rpc_error_all_active_requests(&mut self, disconnected_peer_id: PeerId) {
self.drain_network_rx();
@@ -562,6 +560,10 @@ impl TestRig {
}
}
fn peer_disconnected(&mut self, peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(peer_id));
}
fn drain_network_rx(&mut self) {
while let Ok(event) = self.network_rx.try_recv() {
self.network_rx_queue.push(event);
@@ -1026,6 +1028,28 @@ fn test_single_block_lookup_failure() {
rig.expect_empty_network();
}
#[test]
fn test_single_block_lookup_peer_disconnected_then_rpc_error() {
let mut rig = TestRig::test_setup();
let block_hash = Hash256::random();
let peer_id = rig.new_connected_peer();
// Trigger the request.
rig.trigger_unknown_block_from_attestation(block_hash, peer_id);
let id = rig.expect_block_lookup_request(block_hash);
// The peer disconnect event reaches sync before the rpc error.
rig.peer_disconnected(peer_id);
// The lookup is not removed as it can still potentially make progress.
rig.assert_single_lookups_count(1);
// The request fails.
rig.single_lookup_failed(id, peer_id, RPCError::Disconnected);
rig.expect_block_lookup_request(block_hash);
// The request should be removed from the network context on disconnection.
rig.expect_empty_network();
}
#[test]
fn test_single_block_lookup_becomes_parent_request() {
let mut rig = TestRig::test_setup();
@@ -1289,19 +1313,9 @@ fn test_lookup_peer_disconnected_no_peers_left_while_request() {
rig.trigger_unknown_parent_block(peer_id, trigger_block.into());
rig.peer_disconnected(peer_id);
rig.rpc_error_all_active_requests(peer_id);
rig.expect_no_active_lookups();
}
#[test]
fn test_lookup_peer_disconnected_no_peers_left_not_while_request() {
let mut rig = TestRig::test_setup();
let peer_id = rig.new_connected_peer();
let trigger_block = rig.rand_block();
rig.trigger_unknown_parent_block(peer_id, trigger_block.into());
rig.peer_disconnected(peer_id);
// Note: this test case may be removed in the future. It's not strictly necessary to drop a
// lookup if there are no peers left. Lookup should only be dropped if it can not make progress
rig.expect_no_active_lookups();
// Erroring all rpc requests and disconnecting the peer shouldn't remove the requests
// from the lookups map as they can still progress.
rig.assert_single_lookups_count(2);
}
#[test]

View File

@@ -1,4 +1,5 @@
use beacon_chain::block_verification_types::RpcBlock;
use lighthouse_network::PeerId;
use ssz_types::VariableList;
use std::{collections::VecDeque, sync::Arc};
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
@@ -17,16 +18,19 @@ pub struct BlocksAndBlobsRequestInfo<E: EthSpec> {
is_sidecars_stream_terminated: bool,
/// Used to determine if this accumulator should wait for a sidecars stream termination
request_type: ByRangeRequestType,
/// The peer the request was made to.
pub(crate) peer_id: PeerId,
}
impl<E: EthSpec> BlocksAndBlobsRequestInfo<E> {
pub fn new(request_type: ByRangeRequestType) -> Self {
pub fn new(request_type: ByRangeRequestType, peer_id: PeerId) -> Self {
Self {
accumulated_blocks: <_>::default(),
accumulated_sidecars: <_>::default(),
is_blocks_stream_terminated: <_>::default(),
is_sidecars_stream_terminated: <_>::default(),
request_type,
peer_id,
}
}
@@ -109,12 +113,14 @@ mod tests {
use super::BlocksAndBlobsRequestInfo;
use crate::sync::range_sync::ByRangeRequestType;
use beacon_chain::test_utils::{generate_rand_block_and_blobs, NumBlobs};
use lighthouse_network::PeerId;
use rand::SeedableRng;
use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E};
#[test]
fn no_blobs_into_responses() {
let mut info = BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::Blocks);
let peer_id = PeerId::random();
let mut info = BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::Blocks, peer_id);
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..4)
.map(|_| generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng).0)
@@ -133,7 +139,9 @@ mod tests {
#[test]
fn empty_blobs_into_responses() {
let mut info = BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::BlocksAndBlobs);
let peer_id = PeerId::random();
let mut info =
BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::BlocksAndBlobs, peer_id);
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..4)
.map(|_| {

View File

@@ -372,16 +372,39 @@ impl<T: BeaconChainTypes> SyncManager<T> {
Err(_) => self.update_sync_state(),
},
}
} else {
debug!(
self.log,
"RPC error for range request has no associated entry in network context, ungraceful disconnect";
"peer_id" => %peer_id,
"request_id" => %id,
"error" => ?error,
);
}
}
}
}
/// Handles a peer disconnect.
///
/// It is important that a peer disconnect retries all the batches/lookups as
/// there is no way to guarantee that libp2p always emits a error along with
/// the disconnect.
fn peer_disconnect(&mut self, peer_id: &PeerId) {
// Inject a Disconnected error on all requests associated with the disconnected peer
// to retry all batches/lookups
for request_id in self.network.peer_disconnected(peer_id) {
self.inject_error(*peer_id, request_id, RPCError::Disconnected);
}
// Remove peer from all data structures
self.range_sync.peer_disconnect(&mut self.network, peer_id);
let _ = self
.backfill_sync
.peer_disconnected(peer_id, &mut self.network);
self.block_lookups.peer_disconnected(peer_id);
// Regardless of the outcome, we update the sync status.
let _ = self.backfill_sync.peer_disconnected(peer_id);
self.update_sync_state();
}
@@ -951,7 +974,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.network.insert_range_blocks_and_blobs_request(
id,
resp.sender_id,
BlocksAndBlobsRequestInfo::new(resp.request_type),
BlocksAndBlobsRequestInfo::new(resp.request_type, peer_id),
);
// inform range that the request needs to be treated as failed
// With time we will want to downgrade this log

View File

@@ -177,6 +177,46 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
}
/// Returns the ids of all the requests made to the given peer_id.
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Vec<SyncRequestId> {
let failed_range_ids =
self.range_blocks_and_blobs_requests
.iter()
.filter_map(|(id, request)| {
if request.1.peer_id == *peer_id {
Some(SyncRequestId::RangeBlockAndBlobs { id: *id })
} else {
None
}
});
let failed_block_ids = self
.blocks_by_root_requests
.iter()
.filter_map(|(id, request)| {
if request.peer_id == *peer_id {
Some(SyncRequestId::SingleBlock { id: *id })
} else {
None
}
});
let failed_blob_ids = self
.blobs_by_root_requests
.iter()
.filter_map(|(id, request)| {
if request.peer_id == *peer_id {
Some(SyncRequestId::SingleBlob { id: *id })
} else {
None
}
});
failed_range_ids
.chain(failed_block_ids)
.chain(failed_blob_ids)
.collect()
}
pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
&self.network_beacon_processor.network_globals
}
@@ -272,8 +312,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
sender_id: RangeRequestId,
) -> Result<Id, RpcRequestSendError> {
let id = self.blocks_by_range_request(peer_id, batch_type, request)?;
self.range_blocks_and_blobs_requests
.insert(id, (sender_id, BlocksAndBlobsRequestInfo::new(batch_type)));
self.range_blocks_and_blobs_requests.insert(
id,
(
sender_id,
BlocksAndBlobsRequestInfo::new(batch_type, peer_id),
),
);
Ok(id)
}
@@ -343,7 +388,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// Block is known are currently processing, expect a future event with the result of
// processing.
BlockProcessStatus::NotValidated { .. } => {
return Ok(LookupRequestResult::Pending("block in processing cache"))
// Lookup sync event safety: If the block is currently in the processing cache, we
// are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will
// make progress on this lookup
return Ok(LookupRequestResult::Pending("block in processing cache"));
}
// Block is fully validated. If it's not yet imported it's waiting for missing block
// components. Consider this request completed and do nothing.
@@ -366,6 +414,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let request = BlocksByRootSingleRequest(block_root);
// Lookup sync event safety: If network_send.send() returns Ok(_) we are guaranteed that
// eventually at least one this 3 events will be received:
// - StreamTermination(request_id): handled by `Self::on_single_block_response`
// - RPCError(request_id): handled by `Self::on_single_block_response`
// - Disconnect(peer_id) handled by `Self::peer_disconnected``which converts it to a
// ` RPCError(request_id)`event handled by the above method
self.network_send
.send(NetworkMessage::SendRequest {
peer_id,
@@ -375,7 +429,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
self.blocks_by_root_requests
.insert(id, ActiveBlocksByRootRequest::new(request));
.insert(id, ActiveBlocksByRootRequest::new(request, peer_id));
Ok(LookupRequestResult::RequestSent(req_id))
}
@@ -408,6 +462,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// latter handle the case where if the peer sent no blobs, penalize.
// - if `downloaded_block_expected_blobs` is Some = block is downloading or processing.
// - if `num_expected_blobs` returns Some = block is processed.
//
// Lookup sync event safety: Reaching this code means that a block is not in any pre-import
// cache nor in the request state of this lookup. Therefore, the block must either: (1) not
// be downloaded yet or (2) the block is already imported into the fork-choice.
// In case (1) the lookup must either successfully download the block or get dropped.
// In case (2) the block will be downloaded, processed, reach `BlockIsAlreadyKnown` and
// get dropped as completed.
return Ok(LookupRequestResult::Pending("waiting for block download"));
};
@@ -444,6 +505,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
indices,
};
// Lookup sync event safety: Refer to `Self::block_lookup_request` `network_send.send` call
self.network_send
.send(NetworkMessage::SendRequest {
peer_id,
@@ -453,7 +515,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
self.blobs_by_root_requests
.insert(id, ActiveBlobsByRootRequest::new(request));
.insert(id, ActiveBlobsByRootRequest::new(request, peer_id));
Ok(LookupRequestResult::RequestSent(req_id))
}
@@ -660,6 +722,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
debug!(self.log, "Sending block for processing"; "block" => ?block_root, "id" => id);
// Lookup sync event safety: If `beacon_processor.send_rpc_beacon_block` returns Ok() sync
// must receive a single `SyncMessage::BlockComponentProcessed` with this process type
beacon_processor
.send_rpc_beacon_block(
block_root,
@@ -689,6 +753,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "id" => id);
// Lookup sync event safety: If `beacon_processor.send_rpc_blobs` returns Ok() sync
// must receive a single `SyncMessage::BlockComponentProcessed` event with this process type
beacon_processor
.send_rpc_blobs(
block_root,

View File

@@ -1,5 +1,8 @@
use beacon_chain::get_block_root;
use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest};
use lighthouse_network::{
rpc::{methods::BlobsByRootRequest, BlocksByRootRequest},
PeerId,
};
use std::sync::Arc;
use strum::IntoStaticStr;
use types::{
@@ -20,13 +23,15 @@ pub enum LookupVerifyError {
pub struct ActiveBlocksByRootRequest {
request: BlocksByRootSingleRequest,
resolved: bool,
pub(crate) peer_id: PeerId,
}
impl ActiveBlocksByRootRequest {
pub fn new(request: BlocksByRootSingleRequest) -> Self {
pub fn new(request: BlocksByRootSingleRequest, peer_id: PeerId) -> Self {
Self {
request,
resolved: false,
peer_id,
}
}
@@ -94,14 +99,16 @@ pub struct ActiveBlobsByRootRequest<E: EthSpec> {
request: BlobsByRootSingleBlockRequest,
blobs: Vec<Arc<BlobSidecar<E>>>,
resolved: bool,
pub(crate) peer_id: PeerId,
}
impl<E: EthSpec> ActiveBlobsByRootRequest<E> {
pub fn new(request: BlobsByRootSingleBlockRequest) -> Self {
pub fn new(request: BlobsByRootSingleBlockRequest, peer_id: PeerId) -> Self {
Self {
request,
blobs: vec![],
resolved: false,
peer_id,
}
}

View File

@@ -174,8 +174,30 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Removes a peer from the chain.
/// If the peer has active batches, those are considered failed and re-requested.
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
self.peers.remove(peer_id);
pub fn remove_peer(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> ProcessingResult {
if let Some(batch_ids) = self.peers.remove(peer_id) {
// fail the batches.
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
if let BatchOperationOutcome::Failed { blacklist } =
batch.download_failed(true)?
{
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: id,
});
}
self.retry_batch_download(network, id)?;
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)
}
}
}
if self.peers.is_empty() {
Err(RemoveChain::EmptyPeerPool)

View File

@@ -278,8 +278,9 @@ where
/// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum
/// retries. In this case, we need to remove the chain.
fn remove_peer(&mut self, network: &mut SyncNetworkContext<T>, peer_id: &PeerId) {
for (removed_chain, sync_type, remove_reason) in
self.chains.call_all(|chain| chain.remove_peer(peer_id))
for (removed_chain, sync_type, remove_reason) in self
.chains
.call_all(|chain| chain.remove_peer(peer_id, network))
{
self.on_chain_removed(
removed_chain,

View File

@@ -146,8 +146,19 @@ For more information on historic state storage see the
To manually specify a checkpoint use the following two flags:
* `--checkpoint-state`: accepts an SSZ-encoded `BeaconState` blob
* `--checkpoint-block`: accepts an SSZ-encoded `SignedBeaconBlock` blob
* `--checkpoint-state`: accepts an SSZ-encoded `BeaconState` file
* `--checkpoint-block`: accepts an SSZ-encoded `SignedBeaconBlock` file
* `--checkpoint-blobs`: accepts an SSZ-encoded `Blobs` file
The command is as following:
```bash
curl -H "Accept: application/octet-stream" "http://localhost:5052/eth/v2/debug/beacon/states/$SLOT" > state.ssz
curl -H "Accept: application/octet-stream" "http://localhost:5052/eth/v2/beacon/blocks/$SLOT" > block.ssz
curl -H "Accept: application/octet-stream" "http://localhost:5052/eth/v1/beacon/blob_sidecars/$SLOT" > blobs.ssz
```
where `$SLOT` is the slot number. It can be specified as `head` or `finalized` as well.
_Both_ the state and block must be provided and the state **must** match the block. The
state may be from the same slot as the block (unadvanced), or advanced to an epoch boundary,

View File

@@ -16,6 +16,7 @@ validator client or the slasher**.
| Lighthouse version | Release date | Schema version | Downgrade available? |
|--------------------|--------------|----------------|----------------------|
| v5.2.0 | Jun 2024 | v19 | yes before Deneb |
| v5.1.0 | Mar 2024 | v19 | yes before Deneb |
| v5.0.0 | Feb 2024 | v19 | yes before Deneb |
| v4.6.0 | Dec 2023 | v19 | yes before Deneb |

View File

@@ -15,6 +15,7 @@
- [My beacon node logs `WARN Error signalling fork choice waiter`, what should I do?](#bn-fork-choice)
- [My beacon node logs `ERRO Aggregate attestation queue full`, what should I do?](#bn-queue-full)
- [My beacon node logs `WARN Failed to finalize deposit cache`, what should I do?](#bn-deposit-cache)
- [My beacon node logs `WARN Could not verify blob sidecar for gossip`, what does it mean?](#bn-blob)
## [Validator](#validator-1)
@@ -214,6 +215,16 @@ This suggests that the computer resources are being overwhelmed. It could be due
This is a known [bug](https://github.com/sigp/lighthouse/issues/3707) that will fix by itself.
### <a name="bn-blob"></a> My beacon node logs `WARN Could not verify blob sidecar for gossip`, what does it mean?
An example of the full log is shown below:
```text
Jun 07 23:05:12.170 WARN Could not verify blob sidecar for gossip. Ignoring the blob sidecar, commitment: 0xaa97…6f54, index: 1, root: 0x93b8…c47c, slot: 9248017, error: PastFinalizedSlot { blob_slot: Slot(9248017), finalized_slot: Slot(9248032) }, module: network::network_beacon_processor::gossip_methods:720
```
The `PastFinalizedSlot` indicates that the time at which the node received the blob has past the finalization period. This could be due to a peer sending an earlier blob. The log will be gone when Lighthouse eventually drops the peer.
## Validator
### <a name="vc-activation"></a> Why does it take so long for a validator to be activated?
@@ -327,13 +338,24 @@ The first thing is to ensure both consensus and execution clients are synced wit
You can see more information on the [Ethstaker KB](https://ethstaker.gitbook.io/ethstaker-knowledge-base/help/missed-attestations).
Another cause for missing attestations is delays during block processing. When this happens, the debug logs will show (debug logs can be found under `$datadir/beacon/logs`):
Another cause for missing attestations is the block arriving late, or there are delays during block processing.
An example of the log: (debug logs can be found under `$datadir/beacon/logs`):
```text
DEBG Delayed head block set_as_head_delay: Some(93.579425ms), imported_delay: Some(1.460405278s), observed_delay: Some(2.540811921s), block_delay: 4.094796624s, slot: 6837344, proposer_index: 211108, block_root: 0x2c52231c0a5a117401f5231585de8aa5dd963bc7cbc00c544e681342eedd1700, service: beacon
Delayed head block, set_as_head_time_ms: 27, imported_time_ms: 168, attestable_delay_ms: 4209, available_delay_ms: 4186, execution_time_ms: 201, blob_delay_ms: 3815, observed_delay_ms: 3984, total_delay_ms: 4381, slot: 1886014, proposer_index: 733, block_root: 0xa7390baac88d50f1cbb5ad81691915f6402385a12521a670bbbd4cd5f8bf3934, service: beacon, module: beacon_chain::canonical_head:1441
```
The fields to look for are `imported_delay > 1s` and `observed_delay < 3s`. The `imported_delay` is how long the node took to process the block. The `imported_delay` of larger than 1 second suggests that there is slowness in processing the block. It could be due to high CPU usage, high I/O disk usage or the clients are doing some background maintenance processes. The `observed_delay` is determined mostly by the proposer and partly by your networking setup (e.g., how long it took for the node to receive the block). The `observed_delay` of less than 3 seconds means that the block is not arriving late from the block proposer. Combining the above, this implies that the validator should have been able to attest to the block, but failed due to slowness in the node processing the block.
The field to look for is `attestable_delay`, which defines the time when a block is ready for the validator to attest. If the `attestable_delay` is greater than 4s which has past the window of attestation, the attestation wil fail. In the above example, the delay is mostly caused by late block observed by the node, as shown in `observed_delay`. The `observed_delay` is determined mostly by the proposer and partly by your networking setup (e.g., how long it took for the node to receive the block). Ideally, `observed_delay` should be less than 3 seconds. In this example, the validator failed to attest the block due to the block arriving late.
Another example of log:
```
DEBG Delayed head block, set_as_head_time_ms: 22, imported_time_ms: 312, attestable_delay_ms: 7052, available_delay_ms: 6874, execution_time_ms: 4694, blob_delay_ms: 2159, observed_delay_ms: 2179, total_delay_ms: 7209, slot: 1885922, proposer_index: 606896, block_root: 0x9966df24d24e722d7133068186f0caa098428696e9f441ac416d0aca70cc0a23, service: beacon, module: beacon_chain::canonical_head:1441
/159.69.68.247/tcp/9000, service: libp2p, module: lighthouse_network::service:1811
```
In this example, we see that the `execution_time_ms` is 4694ms. The `execution_time_ms` is how long the node took to process the block. The `execution_time_ms` of larger than 1 second suggests that there is slowness in processing the block. If the `execution_time_ms` is high, it could be due to high CPU usage, high I/O disk usage or the clients are doing some background maintenance processes.
### <a name="vc-head-vote"></a> Sometimes I miss the attestation head vote, resulting in penalty. Is this normal?
@@ -514,21 +536,23 @@ If you would still like to subscribe to all subnets, you can use the flag `subsc
### <a name="net-quic"></a> How to know how many of my peers are connected via QUIC?
With `--metrics` enabled in the beacon node, you can find the number of peers connected via QUIC using:
With `--metrics` enabled in the beacon node, the [Grafana Network dashboard](https://github.com/sigp/lighthouse-metrics/blob/master/dashboards/Network.json) displays the connected by transport, which will show the number of peers connected via QUIC.
Alternatively, you can find the number of peers connected via QUIC manually using:
```bash
curl -s "http://localhost:5054/metrics" | grep libp2p_quic_peers
curl -s "http://localhost:5054/metrics" | grep 'transport="quic"'
```
A response example is:
```text
# HELP libp2p_quic_peers Count of libp2p peers currently connected via QUIC
# TYPE libp2p_quic_peers gauge
libp2p_quic_peers 4
libp2p_peers_multi{direction="inbound",transport="quic"} 27
libp2p_peers_multi{direction="none",transport="quic"} 0
libp2p_peers_multi{direction="outbound",transport="quic"} 9
```
which shows that there are 4 peers connected via QUIC.
which shows that there are a total of 36 peers connected via QUIC.
## Miscellaneous

View File

@@ -114,13 +114,13 @@ changed after initialization.
* Flag: `--slasher-max-db-size GIGABYTES`
* Argument: maximum size of the database in gigabytes
* Default: 256 GB
* Default: 512 GB
Both database backends LMDB and MDBX place a hard limit on the size of the database
file. You can use the `--slasher-max-db-size` flag to set this limit. It can be adjusted after
initialization if the limit is reached.
By default the limit is set to accommodate the default history length and around 600K validators (with about 30% headroom) but
By default the limit is set to accommodate the default history length and around 1 million validators but
you can set it lower if running with a reduced history length. The space required scales
approximately linearly in validator count and history length, i.e. if you halve either you can halve
the space required.

View File

@@ -75,7 +75,7 @@ Once you have the slashing protection database from your existing client, you ca
using this command:
```bash
lighthouse account validator slashing-protection import <my_interchange.json>
lighthouse account validator slashing-protection import filename.json
```
When importing an interchange file, you still need to import the validator keystores themselves
@@ -86,7 +86,7 @@ separately, using the instructions for [import validator keys](./mainnet-validat
You can export Lighthouse's database for use with another client with this command:
```
lighthouse account validator slashing-protection export <lighthouse_interchange.json>
lighthouse account validator slashing-protection export filename.json
```
The validator client needs to be stopped in order to export, to guarantee that the data exported is

View File

@@ -1,6 +1,6 @@
[package]
name = "boot_node"
version = "5.2.0"
version = "5.2.1"
authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = { workspace = true }

View File

@@ -17,8 +17,8 @@ pub const VERSION: &str = git_version!(
// NOTE: using --match instead of --exclude for compatibility with old Git
"--match=thiswillnevermatchlol"
],
prefix = "Lighthouse/v5.2.0-",
fallback = "Lighthouse/v5.2.0"
prefix = "Lighthouse/v5.2.1-",
fallback = "Lighthouse/v5.2.1"
);
/// Returns the first eight characters of the latest commit hash for this build.

View File

@@ -1,7 +1,7 @@
[package]
name = "lcli"
description = "Lighthouse CLI (modeled after zcli)"
version = "5.2.0"
version = "5.2.1"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = { workspace = true }

View File

@@ -1,6 +1,6 @@
[package]
name = "lighthouse"
version = "5.2.0"
version = "5.2.1"
authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = { workspace = true }
autotests = false

View File

@@ -11,7 +11,7 @@ pub const DEFAULT_VALIDATOR_CHUNK_SIZE: usize = 256;
pub const DEFAULT_HISTORY_LENGTH: usize = 4096;
pub const DEFAULT_UPDATE_PERIOD: u64 = 12;
pub const DEFAULT_SLOT_OFFSET: f64 = 10.5;
pub const DEFAULT_MAX_DB_SIZE: usize = 256 * 1024; // 256 GiB
pub const DEFAULT_MAX_DB_SIZE: usize = 512 * 1024; // 512 GiB
pub const DEFAULT_ATTESTATION_ROOT_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(100_000);
pub const DEFAULT_BROADCAST: bool = false;