diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index f4b3b78d04..5a11890a26 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -145,6 +145,9 @@ pub struct Config { /// Configuration for the outbound rate limiter (requests made by this node). pub outbound_rate_limiter_config: Option, + + /// Configures if/where invalid blocks should be stored. + pub invalid_block_storage: Option, } impl Config { @@ -329,6 +332,7 @@ impl Default for Config { metrics_enabled: false, enable_light_client_server: false, outbound_rate_limiter_config: None, + invalid_block_storage: None, } } } diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 9603205228..26d2c19b51 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -54,6 +54,7 @@ use logging::TimeLatch; use slog::{crit, debug, error, trace, warn, Logger}; use std::collections::VecDeque; use std::future::Future; +use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, Weak}; use std::task::Context; @@ -982,6 +983,13 @@ impl Stream for InboundEvents { } } +/// Defines if and where we will store the SSZ files of invalid blocks. +#[derive(Clone)] +pub enum InvalidBlockStorage { + Enabled(PathBuf), + Disabled, +} + /// A mutli-threaded processor for messages received on the network /// that need to be processed by the `BeaconChain` /// @@ -995,6 +1003,7 @@ pub struct BeaconProcessor { pub max_workers: usize, pub current_workers: usize, pub importing_blocks: DuplicateCache, + pub invalid_block_storage: InvalidBlockStorage, pub log: Logger, } @@ -1676,19 +1685,23 @@ impl BeaconProcessor { peer_client, block, seen_timestamp, - } => task_spawner.spawn_async(async move { - worker - .process_gossip_block( - message_id, - peer_id, - peer_client, - block, - work_reprocessing_tx, - duplicate_cache, - seen_timestamp, - ) - .await - }), + } => { + let invalid_block_storage = self.invalid_block_storage.clone(); + task_spawner.spawn_async(async move { + worker + .process_gossip_block( + message_id, + peer_id, + peer_client, + block, + work_reprocessing_tx, + duplicate_cache, + invalid_block_storage, + seen_timestamp, + ) + .await + }) + } /* * Import for blocks that we received earlier than their intended slot. */ @@ -1696,12 +1709,16 @@ impl BeaconProcessor { peer_id, block, seen_timestamp, - } => task_spawner.spawn_async(worker.process_gossip_verified_block( - peer_id, - *block, - work_reprocessing_tx, - seen_timestamp, - )), + } => { + let invalid_block_storage = self.invalid_block_storage.clone(); + task_spawner.spawn_async(worker.process_gossip_verified_block( + peer_id, + *block, + work_reprocessing_tx, + invalid_block_storage, + seen_timestamp, + )) + } /* * Voluntary exits received on gossip. */ diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs index 4b0a159eb4..b93e83ad78 100644 --- a/beacon_node/network/src/beacon_processor/tests.rs +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -203,6 +203,7 @@ impl TestRig { max_workers: cmp::max(1, num_cpus::get()), current_workers: 0, importing_blocks: duplicate_cache.clone(), + invalid_block_storage: InvalidBlockStorage::Disabled, log: log.clone(), } .spawn_manager(beacon_processor_rx, Some(work_journal_tx)); diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 1ec03ae954..9d85bc545e 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -13,9 +13,12 @@ use beacon_chain::{ }; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use operation_pool::ReceivedPreCapella; -use slog::{crit, debug, error, info, trace, warn}; +use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use ssz::Encode; +use std::fs; +use std::io::Write; +use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; @@ -34,7 +37,7 @@ use super::{ }, Worker, }; -use crate::beacon_processor::DuplicateCache; +use crate::beacon_processor::{DuplicateCache, InvalidBlockStorage}; /// Set to `true` to introduce stricter penalties for peers who send some types of late consensus /// messages. @@ -663,6 +666,7 @@ impl Worker { block: Arc>, reprocess_tx: mpsc::Sender>, duplicate_cache: DuplicateCache, + invalid_block_storage: InvalidBlockStorage, seen_duration: Duration, ) { if let Some(gossip_verified_block) = self @@ -683,6 +687,7 @@ impl Worker { peer_id, gossip_verified_block, reprocess_tx, + invalid_block_storage, seen_duration, ) .await; @@ -935,13 +940,14 @@ impl Worker { peer_id: PeerId, verified_block: GossipVerifiedBlock, reprocess_tx: mpsc::Sender>, + invalid_block_storage: InvalidBlockStorage, // This value is not used presently, but it might come in handy for debugging. _seen_duration: Duration, ) { let block: Arc<_> = verified_block.block.clone(); let block_root = verified_block.block_root; - match self + let result = self .chain .process_block( block_root, @@ -949,14 +955,15 @@ impl Worker { CountUnrealized::True, NotifyExecutionLayer::Yes, ) - .await - { + .await; + + match &result { Ok(block_root) => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); if reprocess_tx .try_send(ReprocessQueueMessage::BlockImported { - block_root, + block_root: *block_root, parent_root: block.message().parent_root(), }) .is_err() @@ -986,7 +993,11 @@ impl Worker { "Block with unknown parent attempted to be processed"; "peer_id" => %peer_id ); - self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block, block_root)); + self.send_sync_message(SyncMessage::UnknownBlock( + peer_id, + block.clone(), + block_root, + )); } Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => { debug!( @@ -1015,6 +1026,16 @@ impl Worker { ); } }; + + if let Err(e) = &result { + self.maybe_store_invalid_block( + &invalid_block_storage, + block_root, + &block, + e, + &self.log, + ); + } } pub fn process_gossip_voluntary_exit( @@ -2486,4 +2507,62 @@ impl Worker { self.propagate_if_timely(is_timely, message_id, peer_id) } + + /// Stores a block as a SSZ file, if and where `invalid_block_storage` dictates. + fn maybe_store_invalid_block( + &self, + invalid_block_storage: &InvalidBlockStorage, + block_root: Hash256, + block: &SignedBeaconBlock, + error: &BlockError, + log: &Logger, + ) { + if let InvalidBlockStorage::Enabled(base_dir) = invalid_block_storage { + let block_path = base_dir.join(format!("{}_{:?}.ssz", block.slot(), block_root)); + let error_path = base_dir.join(format!("{}_{:?}.error", block.slot(), block_root)); + + let write_file = |path: PathBuf, bytes: &[u8]| { + // No need to write the same file twice. For the error file, + // this means that we'll remember the first error message but + // forget the rest. + if path.exists() { + return; + } + + // Write to the file. + let write_result = fs::OpenOptions::new() + // Only succeed if the file doesn't already exist. We should + // have checked for this earlier. + .create_new(true) + .write(true) + .open(&path) + .map_err(|e| format!("Failed to open file: {:?}", e)) + .map(|mut file| { + file.write_all(bytes) + .map_err(|e| format!("Failed to write file: {:?}", e)) + }); + if let Err(e) = write_result { + error!( + log, + "Failed to store invalid block/error"; + "error" => e, + "path" => ?path, + "root" => ?block_root, + "slot" => block.slot(), + ) + } else { + info!( + log, + "Stored invalid block/error "; + "path" => ?path, + "root" => ?block_root, + "slot" => block.slot(), + ) + } + }; + + write_file(block_path, &block.as_ssz_bytes()); + write_file(error_path, error.to_string().as_bytes()); + } + } } diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 7f75a27fe2..1b0f1fb41e 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -6,7 +6,7 @@ #![allow(clippy::unit_arg)] use crate::beacon_processor::{ - BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN, + BeaconProcessor, InvalidBlockStorage, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN, }; use crate::error; use crate::service::{NetworkMessage, RequestId}; @@ -81,6 +81,7 @@ impl Router { network_globals: Arc>, network_send: mpsc::UnboundedSender>, executor: task_executor::TaskExecutor, + invalid_block_storage: InvalidBlockStorage, log: slog::Logger, ) -> error::Result>> { let message_handler_log = log.new(o!("service"=> "router")); @@ -112,6 +113,7 @@ impl Router { max_workers: cmp::max(1, num_cpus::get()), current_workers: 0, importing_blocks: Default::default(), + invalid_block_storage, log: log.clone(), } .spawn_manager(beacon_processor_receive, None); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index d630cf9c39..edc1d5c2ef 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -1,4 +1,5 @@ use super::sync::manager::RequestId as SyncId; +use crate::beacon_processor::InvalidBlockStorage; use crate::persisted_dht::{clear_dht, load_dht, persist_dht}; use crate::router::{Router, RouterMessage}; use crate::subnet_service::SyncCommitteeService; @@ -295,6 +296,12 @@ impl NetworkService { } } + let invalid_block_storage = config + .invalid_block_storage + .clone() + .map(InvalidBlockStorage::Enabled) + .unwrap_or(InvalidBlockStorage::Disabled); + // launch derived network services // router task @@ -303,6 +310,7 @@ impl NetworkService { network_globals.clone(), network_senders.network_send(), executor.clone(), + invalid_block_storage, network_log.clone(), )?; diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 633cbf0438..b20b5c0a95 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -1093,4 +1093,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { // always using the builder. .conflicts_with("builder-profit-threshold") ) + .arg( + Arg::with_name("invalid-gossip-verified-blocks-path") + .long("invalid-gossip-verified-blocks-path") + .value_name("PATH") + .help("If a block succeeds gossip validation whilst failing full validation, store \ + the block SSZ as a file at this path. This feature is only recommended for \ + developers. This directory is not pruned, users should be careful to avoid \ + filling up their disks.") + ) } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index f05fea2db1..6f626bee8d 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -793,6 +793,11 @@ pub fn get_config( client_config.chain.enable_backfill_rate_limiting = !cli_args.is_present("disable-backfill-rate-limiting"); + if let Some(path) = clap_utils::parse_optional(cli_args, "invalid-gossip-verified-blocks-path")? + { + client_config.network.invalid_block_storage = Some(path); + } + Ok(client_config) } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 7e647c904d..75bcccc9de 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -2199,3 +2199,24 @@ fn disable_optimistic_finalized_sync() { assert!(!config.chain.optimistic_finalized_sync); }); } + +#[test] +fn invalid_gossip_verified_blocks_path_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert_eq!(config.network.invalid_block_storage, None)); +} + +#[test] +fn invalid_gossip_verified_blocks_path() { + let path = "/home/karlm/naughty-blocks"; + CommandLineTest::new() + .flag("invalid-gossip-verified-blocks-path", Some(path)) + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.network.invalid_block_storage, + Some(PathBuf::from(path)) + ) + }); +}