Merge branch 'deneb-free-blobs' of https://github.com/sigp/lighthouse into some-blob-reprocessing-work

This commit is contained in:
realbigsean
2023-06-02 08:19:58 -04:00
333 changed files with 5934 additions and 13398 deletions

View File

@@ -21,8 +21,8 @@ types = { path = "../../consensus/types" }
slot_clock = { path = "../../common/slot_clock" }
slog = { version = "2.5.2", features = ["max_level_trace", "nested-values"] }
hex = "0.4.2"
eth2_ssz = "0.4.1"
eth2_ssz_types = "0.2.2"
ethereum_ssz = "0.5.0"
ssz_types = "0.5.0"
futures = "0.3.7"
error-chain = "0.12.4"
tokio = { version = "1.14.0", features = ["full"] }
@@ -35,7 +35,7 @@ lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
logging = { path = "../../common/logging" }
task_executor = { path = "../../common/task_executor" }
igd = "0.11.1"
igd = "0.12.1"
itertools = "0.10.0"
num_cpus = "1.13.0"
lru_cache = { path = "../../common/lru_cache" }

View File

@@ -56,6 +56,7 @@ use logging::TimeLatch;
use slog::{crit, debug, error, trace, warn, Logger};
use std::collections::VecDeque;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::task::Context;
@@ -1096,6 +1097,13 @@ impl<T: BeaconChainTypes> Stream for InboundEvents<T> {
}
}
/// Defines if and where we will store the SSZ files of invalid blocks.
#[derive(Clone)]
pub enum InvalidBlockStorage {
Enabled(PathBuf),
Disabled,
}
/// A mutli-threaded processor for messages received on the network
/// that need to be processed by the `BeaconChain`
///
@@ -1109,6 +1117,7 @@ pub struct BeaconProcessor<T: BeaconChainTypes> {
pub max_workers: usize,
pub current_workers: usize,
pub importing_blocks: DuplicateCache,
pub invalid_block_storage: InvalidBlockStorage,
pub log: Logger,
}
@@ -1817,19 +1826,23 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
peer_client,
block,
seen_timestamp,
} => task_spawner.spawn_async(async move {
worker
.process_gossip_block(
message_id,
peer_id,
peer_client,
block.into(),
work_reprocessing_tx,
duplicate_cache,
seen_timestamp,
)
.await
}),
} => {
let invalid_block_storage = self.invalid_block_storage.clone();
task_spawner.spawn_async(async move {
worker
.process_gossip_block(
message_id,
peer_id,
peer_client,
block.into(),
work_reprocessing_tx,
duplicate_cache,
invalid_block_storage,
seen_timestamp,
)
.await
})
}
/*
* Verification for blobs sidecars received on gossip.
*/
@@ -1859,12 +1872,16 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
peer_id,
block,
seen_timestamp,
} => task_spawner.spawn_async(worker.process_gossip_verified_block(
peer_id,
*block,
work_reprocessing_tx,
seen_timestamp,
)),
} => {
let invalid_block_storage = self.invalid_block_storage.clone();
task_spawner.spawn_async(worker.process_gossip_verified_block(
peer_id,
*block,
work_reprocessing_tx,
invalid_block_storage,
seen_timestamp,
))
}
/*
* Voluntary exits received on gossip.
*/

View File

@@ -203,6 +203,7 @@ impl TestRig {
max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0,
importing_blocks: duplicate_cache.clone(),
invalid_block_storage: InvalidBlockStorage::Disabled,
log: log.clone(),
}
.spawn_manager(beacon_processor_rx, Some(work_journal_tx));

View File

@@ -14,17 +14,20 @@ use beacon_chain::{
};
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use operation_pool::ReceivedPreCapella;
use slog::{crit, debug, error, info, trace, warn};
use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError;
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, LightClientFinalityUpdate,
LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBlobSidecar,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId,
SyncCommitteeMessage, SyncSubnetId,
LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit,
Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId,
};
use super::{
@@ -34,7 +37,7 @@ use super::{
},
Worker,
};
use crate::beacon_processor::DuplicateCache;
use crate::beacon_processor::{DuplicateCache, InvalidBlockStorage};
/// Set to `true` to introduce stricter penalties for peers who send some types of late consensus
/// messages.
@@ -806,6 +809,7 @@ impl<T: BeaconChainTypes> Worker<T> {
block: BlockWrapper<T::EthSpec>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
duplicate_cache: DuplicateCache,
invalid_block_storage: InvalidBlockStorage,
seen_duration: Duration,
) {
if let Some(gossip_verified_block) = self
@@ -826,6 +830,7 @@ impl<T: BeaconChainTypes> Worker<T> {
peer_id,
gossip_verified_block,
reprocess_tx,
invalid_block_storage,
seen_duration,
)
.await;
@@ -1089,13 +1094,14 @@ impl<T: BeaconChainTypes> Worker<T> {
peer_id: PeerId,
verified_block: GossipVerifiedBlock<T>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
invalid_block_storage: InvalidBlockStorage,
// This value is not used presently, but it might come in handy for debugging.
_seen_duration: Duration,
) {
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;
match self
let result = self
.chain
.process_block(
block_root,
@@ -1103,14 +1109,15 @@ impl<T: BeaconChainTypes> Worker<T> {
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
{
.await;
match &result {
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
if reprocess_tx
.try_send(ReprocessQueueMessage::BlockImported {
block_root,
block_root: *block_root,
parent_root: block.message().parent_root(),
})
.is_err()
@@ -1146,7 +1153,11 @@ impl<T: BeaconChainTypes> Worker<T> {
"Block with unknown parent attempted to be processed";
"peer_id" => %peer_id
);
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block, block_root));
self.send_sync_message(SyncMessage::UnknownBlock(
peer_id,
block.clone(),
block_root,
));
}
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
debug!(
@@ -1175,6 +1186,16 @@ impl<T: BeaconChainTypes> Worker<T> {
);
}
};
if let Err(e) = &result {
self.maybe_store_invalid_block(
&invalid_block_storage,
block_root,
&block,
e,
&self.log,
);
}
}
pub fn process_gossip_voluntary_exit(
@@ -2485,6 +2506,25 @@ impl<T: BeaconChainTypes> Worker<T> {
"peer_id" => %peer_id,
"type" => ?message_type,
);
// Do not penalize the peer.
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return;
}
SyncCommitteeError::PriorSyncContributionMessageKnown { .. } => {
/*
* We have already seen a sync contribution message from this validator for this epoch.
*
* The peer is not necessarily faulty.
*/
debug!(
self.log,
"Prior sync contribution message known";
"peer_id" => %peer_id,
"type" => ?message_type,
);
// We still penalize the peer slightly. We don't want this to be a recurring
// behaviour.
self.gossip_penalize_peer(
@@ -2649,4 +2689,62 @@ impl<T: BeaconChainTypes> Worker<T> {
self.propagate_if_timely(is_timely, message_id, peer_id)
}
/// Stores a block as a SSZ file, if and where `invalid_block_storage` dictates.
fn maybe_store_invalid_block(
&self,
invalid_block_storage: &InvalidBlockStorage,
block_root: Hash256,
block: &SignedBeaconBlock<T::EthSpec>,
error: &BlockError<T::EthSpec>,
log: &Logger,
) {
if let InvalidBlockStorage::Enabled(base_dir) = invalid_block_storage {
let block_path = base_dir.join(format!("{}_{:?}.ssz", block.slot(), block_root));
let error_path = base_dir.join(format!("{}_{:?}.error", block.slot(), block_root));
let write_file = |path: PathBuf, bytes: &[u8]| {
// No need to write the same file twice. For the error file,
// this means that we'll remember the first error message but
// forget the rest.
if path.exists() {
return;
}
// Write to the file.
let write_result = fs::OpenOptions::new()
// Only succeed if the file doesn't already exist. We should
// have checked for this earlier.
.create_new(true)
.write(true)
.open(&path)
.map_err(|e| format!("Failed to open file: {:?}", e))
.map(|mut file| {
file.write_all(bytes)
.map_err(|e| format!("Failed to write file: {:?}", e))
});
if let Err(e) = write_result {
error!(
log,
"Failed to store invalid block/error";
"error" => e,
"path" => ?path,
"root" => ?block_root,
"slot" => block.slot(),
)
} else {
info!(
log,
"Stored invalid block/error ";
"path" => ?path,
"root" => ?block_root,
"slot" => block.slot(),
)
}
};
write_file(block_path, &block.as_ssz_bytes());
write_file(error_path, error.to_string().as_bytes());
}
}
}

View File

@@ -118,10 +118,26 @@ impl<T: BeaconChainTypes> Worker<T> {
}
};
// Returns `true` if the block is already known to fork choice. Notably,
// this will return `false` for blocks that we've already imported but
// ancestors of the finalized checkpoint. That should not be an issue
// for our use here since finalized blocks will always be late and won't
// be requeued anyway.
let block_is_already_known = || {
self.chain
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
};
// If we've already seen a block from this proposer *and* the block
// arrived before the attestation deadline, requeue it to ensure it is
// imported late enough that it won't receive a proposer boost.
if !block_is_late && proposal_already_known() {
//
// Don't requeue blocks if they're already known to fork choice, just
// push them through to block processing so they can be handled through
// the normal channels.
if !block_is_late && proposal_already_known() && !block_is_already_known() {
debug!(
self.log,
"Delaying processing of duplicate RPC block";

View File

@@ -6,7 +6,7 @@
#![allow(clippy::unit_arg)]
use crate::beacon_processor::{
BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
BeaconProcessor, InvalidBlockStorage, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
};
use crate::error;
use crate::service::{NetworkMessage, RequestId};
@@ -80,6 +80,7 @@ impl<T: BeaconChainTypes> Router<T> {
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
executor: task_executor::TaskExecutor,
invalid_block_storage: InvalidBlockStorage,
log: slog::Logger,
) -> error::Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>> {
let message_handler_log = log.new(o!("service"=> "router"));
@@ -111,6 +112,7 @@ impl<T: BeaconChainTypes> Router<T> {
max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0,
importing_blocks: Default::default(),
invalid_block_storage,
log: log.clone(),
}
.spawn_manager(beacon_processor_receive, None);

View File

@@ -1,4 +1,5 @@
use super::sync::manager::RequestId as SyncId;
use crate::beacon_processor::InvalidBlockStorage;
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
use crate::router::{Router, RouterMessage};
use crate::subnet_service::SyncCommitteeService;
@@ -13,6 +14,7 @@ use futures::future::OptionFuture;
use futures::prelude::*;
use futures::StreamExt;
use lighthouse_network::service::Network;
use lighthouse_network::types::GossipKind;
use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance};
use lighthouse_network::{
rpc::{GoodbyeReason, RPCResponseErrorCode},
@@ -23,7 +25,7 @@ use lighthouse_network::{
MessageId, NetworkEvent, NetworkGlobals, PeerId,
};
use slog::{crit, debug, error, info, o, trace, warn};
use std::{net::SocketAddr, pin::Pin, sync::Arc, time::Duration};
use std::{collections::HashSet, net::SocketAddr, pin::Pin, sync::Arc, time::Duration};
use store::HotColdDB;
use strum::IntoStaticStr;
use task_executor::ShutdownReason;
@@ -294,6 +296,12 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
let invalid_block_storage = config
.invalid_block_storage
.clone()
.map(InvalidBlockStorage::Enabled)
.unwrap_or(InvalidBlockStorage::Disabled);
// launch derived network services
// router task
@@ -302,6 +310,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
network_globals.clone(),
network_senders.network_send(),
executor.clone(),
invalid_block_storage,
network_log.clone(),
)?;
@@ -672,6 +681,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
source,
} => self.libp2p.goodbye_peer(&peer_id, reason, source),
NetworkMessage::SubscribeCoreTopics => {
if self.subscribed_core_topics() {
return;
}
if self.shutdown_after_sync {
if let Err(e) = shutdown_sender
.send(ShutdownReason::Success(
@@ -912,6 +925,16 @@ impl<T: BeaconChainTypes> NetworkService<T> {
crit!(self.log, "Unknown new enr fork id"; "new_fork_id" => ?new_enr_fork_id);
}
}
fn subscribed_core_topics(&self) -> bool {
let core_topics = core_topics_to_subscribe::<T::EthSpec>(self.fork_context.current_fork());
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
let subscriptions = self.network_globals.gossipsub_subscriptions.read();
let subscribed_topics: HashSet<&GossipKind> =
subscriptions.iter().map(|topic| topic.kind()).collect();
core_topics.is_subset(&subscribed_topics)
}
}
/// Returns a `Sleep` that triggers after the next change in the beacon chain fork version.

View File

@@ -115,6 +115,9 @@ pub struct AttestationService<T: BeaconChainTypes> {
#[cfg(feature = "deterministic_long_lived_attnets")]
next_long_lived_subscription_event: Pin<Box<tokio::time::Sleep>>,
/// Whether this node is a block proposer-only node.
proposer_only: bool,
/// The logger for the attestation service.
log: slog::Logger,
}
@@ -158,6 +161,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
known_validators: HashSetDelay::new(last_seen_val_timeout),
waker: None,
discovery_disabled: config.disable_discovery,
proposer_only: config.proposer_only,
subscribe_all_subnets: config.subscribe_all_subnets,
long_lived_subnet_subscription_slots,
log,
@@ -259,6 +263,11 @@ impl<T: BeaconChainTypes> AttestationService<T> {
&mut self,
subscriptions: Vec<ValidatorSubscription>,
) -> Result<(), String> {
// If the node is in a proposer-only state, we ignore all subnet subscriptions.
if self.proposer_only {
return Ok(());
}
// Maps each subnet_id subscription to it's highest slot
let mut subnets_to_discover: HashMap<SubnetId, Slot> = HashMap::new();
for subscription in subscriptions {
@@ -453,6 +462,10 @@ impl<T: BeaconChainTypes> AttestationService<T> {
subnet: SubnetId,
attestation: &Attestation<T::EthSpec>,
) -> bool {
// Proposer-only mode does not need to process attestations
if self.proposer_only {
return false;
}
self.aggregate_validators_on_subnet
.as_ref()
.map(|tracked_vals| {

View File

@@ -54,6 +54,9 @@ pub struct SyncCommitteeService<T: BeaconChainTypes> {
/// We are always subscribed to all subnets.
subscribe_all_subnets: bool,
/// Whether this node is a block proposer-only node.
proposer_only: bool,
/// The logger for the attestation service.
log: slog::Logger,
}
@@ -82,6 +85,7 @@ impl<T: BeaconChainTypes> SyncCommitteeService<T> {
waker: None,
subscribe_all_subnets: config.subscribe_all_subnets,
discovery_disabled: config.disable_discovery,
proposer_only: config.proposer_only,
log,
}
}
@@ -110,6 +114,11 @@ impl<T: BeaconChainTypes> SyncCommitteeService<T> {
&mut self,
subscriptions: Vec<SyncCommitteeSubscription>,
) -> Result<(), String> {
// A proposer-only node does not subscribe to any sync-committees
if self.proposer_only {
return Ok(());
}
let mut subnets_to_discover = Vec::new();
for subscription in subscriptions {
metrics::inc_counter(&metrics::SYNC_COMMITTEE_SUBSCRIPTION_REQUESTS);

View File

@@ -160,20 +160,20 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// If, for some reason a backfill has already been completed (or we've used a trusted
// genesis root) then backfill has been completed.
let (state, current_start) = if let Some(anchor_info) = beacon_chain.store.get_anchor_info()
{
if anchor_info.block_backfill_complete() {
(BackFillState::Completed, Epoch::new(0))
} else {
(
BackFillState::Paused,
anchor_info
.oldest_block_slot
.epoch(T::EthSpec::slots_per_epoch()),
)
let (state, current_start) = match beacon_chain.store.get_anchor_info() {
Some(anchor_info) => {
if anchor_info.block_backfill_complete(beacon_chain.genesis_backfill_slot) {
(BackFillState::Completed, Epoch::new(0))
} else {
(
BackFillState::Paused,
anchor_info
.oldest_block_slot
.epoch(T::EthSpec::slots_per_epoch()),
)
}
}
} else {
(BackFillState::NotRequired, Epoch::new(0))
None => (BackFillState::NotRequired, Epoch::new(0)),
};
let bfs = BackFillSync {
@@ -288,6 +288,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
remaining: self
.current_start
.start_slot(T::EthSpec::slots_per_epoch())
.saturating_sub(self.beacon_chain.genesis_backfill_slot)
.as_usize(),
})
}
@@ -1096,7 +1097,12 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
match self.batches.entry(batch_id) {
Entry::Occupied(_) => {
// this batch doesn't need downloading, let this same function decide the next batch
if batch_id == 0 {
if batch_id
== self
.beacon_chain
.genesis_backfill_slot
.epoch(T::EthSpec::slots_per_epoch())
{
self.last_batch_downloaded = true;
}
@@ -1112,7 +1118,12 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
BACKFILL_EPOCHS_PER_BATCH,
batch_type,
));
if batch_id == 0 {
if batch_id
== self
.beacon_chain
.genesis_backfill_slot
.epoch(T::EthSpec::slots_per_epoch())
{
self.last_batch_downloaded = true;
}
self.to_be_downloaded = self
@@ -1129,7 +1140,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// not required.
fn reset_start_epoch(&mut self) -> Result<(), ResetEpochError> {
if let Some(anchor_info) = self.beacon_chain.store.get_anchor_info() {
if anchor_info.block_backfill_complete() {
if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) {
Err(ResetEpochError::SyncCompleted)
} else {
self.current_start = anchor_info
@@ -1144,12 +1155,17 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// Checks with the beacon chain if backfill sync has completed.
fn check_completed(&mut self) -> bool {
if self.current_start == 0 {
if self.current_start
== self
.beacon_chain
.genesis_backfill_slot
.epoch(T::EthSpec::slots_per_epoch())
{
// Check that the beacon chain agrees
if let Some(anchor_info) = self.beacon_chain.store.get_anchor_info() {
// Conditions that we have completed a backfill sync
if anchor_info.block_backfill_complete() {
if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) {
return true;
} else {
error!(self.log, "Backfill out of sync with beacon chain");

View File

@@ -76,7 +76,7 @@ impl TestRig {
log.new(slog::o!("component" => "block_lookups")),
);
let cx = {
let globals = Arc::new(NetworkGlobals::new_test_globals(&log));
let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log));
SyncNetworkContext::new(
network_tx,
globals,

View File

@@ -599,7 +599,7 @@ mod tests {
log.new(o!("component" => "range")),
);
let (network_tx, network_rx) = mpsc::unbounded_channel();
let globals = Arc::new(NetworkGlobals::new_test_globals(&log));
let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log));
let cx = SyncNetworkContext::new(
network_tx,
globals.clone(),