diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index aa9332c01d..33da14f53b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -11,6 +11,7 @@ use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::{RwLock, RwLockReadGuard}; use slog::{error, info, warn, Logger}; use slot_clock::SlotClock; +use ssz::Encode; use state_processing::per_block_processing::{ errors::{ AttestationValidationError, AttesterSlashingValidationError, DepositValidationError, @@ -21,6 +22,8 @@ use state_processing::per_block_processing::{ use state_processing::{ per_block_processing, per_slot_processing, BlockProcessingError, BlockSignatureStrategy, }; +use std::fs; +use std::io::prelude::*; use std::sync::Arc; use std::time::Duration; use store::iter::{BlockRootsIterator, StateRootsIterator}; @@ -1035,6 +1038,8 @@ impl BeaconChain { metrics::stop_timer(db_read_timer); + write_block(&block, block_root, &self.log); + let catchup_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CATCHUP_STATE); // Keep a list of any states that were "skipped" (block-less) in between the parent state @@ -1059,6 +1064,12 @@ impl BeaconChain { metrics::stop_timer(committee_timer); + write_state( + &format!("state_pre_block_{}", block_root), + &state, + &self.log, + ); + let core_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CORE); // Apply the received block to its parent state (which has been transitioned into this @@ -1083,6 +1094,12 @@ impl BeaconChain { let state_root = state.canonical_root(); + write_state( + &format!("state_post_block_{}", block_root), + &state, + &self.log, + ); + if block.state_root != state_root { return Ok(BlockProcessingOutcome::StateRootMismatch { block: block.state_root, @@ -1445,6 +1462,45 @@ impl BeaconChain { } } +fn write_state(prefix: &str, state: &BeaconState, log: &Logger) { + let root = Hash256::from_slice(&state.tree_hash_root()); + let filename = format!("{}_slot_{}_root_{}.ssz", prefix, state.slot, root); + let mut path = std::env::temp_dir().join("lighthouse"); + let _ = fs::create_dir_all(path.clone()); + path = path.join(filename); + + match fs::File::create(path.clone()) { + Ok(mut file) => { + let _ = file.write_all(&state.as_ssz_bytes()); + } + Err(e) => error!( + log, + "Failed to log state"; + "path" => format!("{:?}", path), + "error" => format!("{:?}", e) + ), + } +} + +fn write_block(block: &BeaconBlock, root: Hash256, log: &Logger) { + let filename = format!("block_slot_{}_root{}.ssz", block.slot, root); + let mut path = std::env::temp_dir().join("lighthouse"); + let _ = fs::create_dir_all(path.clone()); + path = path.join(filename); + + match fs::File::create(path.clone()) { + Ok(mut file) => { + let _ = file.write_all(&block.as_ssz_bytes()); + } + Err(e) => error!( + log, + "Failed to log block"; + "path" => format!("{:?}", path), + "error" => format!("{:?}", e) + ), + } +} + impl From for Error { fn from(e: DBError) -> Error { Error::DBError(e) diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 06fc06ddea..ffeba96ec5 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -13,6 +13,7 @@ store = { path = "../store" } eth2-libp2p = { path = "../eth2-libp2p" } types = { path = "../../eth2/types" } slog = { version = "^2.2.3" , features = ["max_level_trace"] } +hex = "0.3" eth2_ssz = "0.1" tree_hash = "0.1" futures = "0.1.25" diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index c54c481c73..83aa7ebd29 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,14 +1,17 @@ use super::manager::SyncMessage; use crate::service::NetworkMessage; -use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; +use beacon_chain::{ + AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BlockProcessingOutcome, +}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; -use slog::{debug, info, o, trace, warn}; +use slog::{debug, error, info, o, trace, warn}; use ssz::Encode; use std::sync::Arc; use store::Store; use tokio::sync::{mpsc, oneshot}; +use tree_hash::SignedRoot; use types::{Attestation, BeaconBlock, Epoch, EthSpec, Hash256, Slot}; //TODO: Put a maximum limit on the number of block that can be requested. @@ -386,8 +389,8 @@ impl MessageProcessor { /// /// Returns a `bool` which, if `true`, indicates we should forward the block to our peers. pub fn on_block_gossip(&mut self, peer_id: PeerId, block: BeaconBlock) -> bool { - if let Ok(outcome) = self.chain.process_block(block.clone()) { - match outcome { + match self.chain.process_block(block.clone()) { + Ok(outcome) => match outcome { BlockProcessingOutcome::Processed { .. } => { trace!(self.log, "Gossipsub block processed"; "peer_id" => format!("{:?}",peer_id)); @@ -408,10 +411,36 @@ impl MessageProcessor { SHOULD_FORWARD_GOSSIP_BLOCK } BlockProcessingOutcome::BlockIsAlreadyKnown => SHOULD_FORWARD_GOSSIP_BLOCK, - _ => SHOULD_NOT_FORWARD_GOSSIP_BLOCK, //TODO: Decide if we want to forward these + other => { + warn!( + self.log, + "Invalid gossip beacon block"; + "outcome" => format!("{:?}", other), + "block root" => format!("{}", Hash256::from_slice(&block.signed_root()[..])), + "block slot" => block.slot + ); + trace!( + self.log, + "Invalid gossip beacon block ssz"; + "ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())), + ); + SHOULD_NOT_FORWARD_GOSSIP_BLOCK //TODO: Decide if we want to forward these + } + }, + Err(e) => { + error!( + self.log, + "Error processing gossip beacon block"; + "error" => format!("{:?}", e), + "block slot" => block.slot + ); + trace!( + self.log, + "Erroneous gossip beacon block ssz"; + "ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())), + ); + SHOULD_NOT_FORWARD_GOSSIP_BLOCK } - } else { - SHOULD_NOT_FORWARD_GOSSIP_BLOCK } } @@ -419,15 +448,30 @@ impl MessageProcessor { /// /// Not currently implemented. pub fn on_attestation_gossip(&mut self, _peer_id: PeerId, msg: Attestation) { - match self.chain.process_attestation(msg) { - Ok(outcome) => info!( - self.log, - "Processed attestation"; - "source" => "gossip", - "outcome" => format!("{:?}", outcome) - ), + match self.chain.process_attestation(msg.clone()) { + Ok(outcome) => { + info!( + self.log, + "Processed attestation"; + "source" => "gossip", + "outcome" => format!("{:?}", outcome) + ); + + if outcome != AttestationProcessingOutcome::Processed { + trace!( + self.log, + "Invalid gossip attestation ssz"; + "ssz" => format!("0x{}", hex::encode(msg.as_ssz_bytes())), + ); + } + } Err(e) => { - warn!(self.log, "InvalidAttestation"; "source" => "gossip", "error" => format!("{:?}", e)) + trace!( + self.log, + "Erroneous gossip attestation ssz"; + "ssz" => format!("0x{}", hex::encode(msg.as_ssz_bytes())), + ); + error!(self.log, "Invalid gossip attestation"; "error" => format!("{:?}", e)); } } } diff --git a/eth2/state_processing/Cargo.toml b/eth2/state_processing/Cargo.toml index 65d5a2f309..633c5bfeff 100644 --- a/eth2/state_processing/Cargo.toml +++ b/eth2/state_processing/Cargo.toml @@ -15,6 +15,7 @@ serde = "1.0" serde_derive = "1.0" lazy_static = "0.1" serde_yaml = "0.8" +eth2_ssz = { path = "../utils/ssz" } beacon_chain = { path = "../../beacon_node/beacon_chain" } store = { path = "../../beacon_node/store" } lmd_ghost = { path = "../lmd_ghost" } diff --git a/eth2/state_processing/benches/benches.rs b/eth2/state_processing/benches/benches.rs index 28afd06143..bdbe57b8ed 100644 --- a/eth2/state_processing/benches/benches.rs +++ b/eth2/state_processing/benches/benches.rs @@ -2,6 +2,7 @@ extern crate env_logger; use criterion::Criterion; use criterion::{black_box, criterion_group, criterion_main, Benchmark}; +use ssz::Encode; use state_processing::{test_utils::BlockBuilder, BlockSignatureStrategy, VerifySignatures}; use types::{BeaconBlock, BeaconState, ChainSpec, EthSpec, MainnetEthSpec, MinimalEthSpec, Slot}; @@ -393,6 +394,32 @@ fn bench_block( }) .sample_size(10), ); + + let local_block = block.clone(); + c.bench( + &title, + Benchmark::new("ssz_serialize_block", move |b| { + b.iter_batched_ref( + || (), + |_| black_box(local_block.as_ssz_bytes()), + criterion::BatchSize::SmallInput, + ) + }) + .sample_size(10), + ); + + let local_block = block.clone(); + c.bench( + &title, + Benchmark::new("ssz_block_len", move |b| { + b.iter_batched_ref( + || (), + |_| black_box(local_block.ssz_bytes_len()), + criterion::BatchSize::SmallInput, + ) + }) + .sample_size(10), + ); } criterion_group!(benches, all_benches,); diff --git a/eth2/utils/bls/src/signature_set.rs b/eth2/utils/bls/src/signature_set.rs index 4b6065f9fd..df1636f1d4 100644 --- a/eth2/utils/bls/src/signature_set.rs +++ b/eth2/utils/bls/src/signature_set.rs @@ -7,7 +7,7 @@ use milagro_bls::AggregateSignature as RawAggregateSignature; type Message = Vec; type Domain = u64; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct SignedMessage<'a> { signing_keys: Vec<&'a G1Point>, message: Message, @@ -25,7 +25,7 @@ impl<'a> SignedMessage<'a> { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct SignatureSet<'a> { pub signature: &'a G2Point, signed_messages: Vec>,