Compare commits

...

10 Commits

Author SHA1 Message Date
Age Manning
33b2a3d0e0 Version bump to v0.2.5 (#1540)
## Description

Version bumps lighthouse to v0.2.5
2020-08-18 11:23:08 +00:00
Paul Hauner
93b7c3b7ff Set default max skips to 700 (#1542)
## Issue Addressed

NA

## Proposed Changes

Sets the default max skips to 700 so that it can cover the 693 slot skip from `80894 - 80201`.

## Additional Info

NA
2020-08-18 09:27:04 +00:00
Age Manning
2d0b214b57 Clean up logs (#1541)
## Description

This PR improves some logging for the end-user. 

It downgrades some warning logs and removes the slots per second sync speed if we are syncing and the speed is 0. This is likely because we are syncing from a finalised checkpoint and the head doesn't change.
2020-08-18 08:11:39 +00:00
Paul Hauner
d4f763bbae Fix mistake with attestation skip slots (#1539)
## Issue Addressed

NA

## Proposed Changes

- Fixes a mistake I made in #1530 which resulted us in *not* rejecting attestations that we intended to reject.
- Adds skip-slot checks for blocks earlier in import process, so it rejects gossip and RPC blocks.

## Additional Info

NA
2020-08-18 06:28:26 +00:00
Age Manning
e1e5002d3c Fingerprint Lodestar (#1536)
Fingerprints the Lodestar client
2020-08-18 06:28:24 +00:00
Paul Hauner
46dd530476 Allow import of Prysm keystores (#1535)
## Issue Addressed

- Resolves #1361

## Proposed Changes

Loosens the constraints imposed by EIP-2335 so we can import keys from Prysm.

## Additional Info

NA
2020-08-18 06:28:20 +00:00
Age Manning
8311074d68 Purge out-dated head chains on chain completion (#1538)
## Description

There can be many head chains queued up to complete. Currently we try and process all of these to completion before we consider the node synced. 

In a chaotic network, there can be many of these and processing them to completion can be very expensive and slow. This PR removes any non-syncing head chains from the queue, and re-status's the peers. If, after we have synced to head on one chain, there is still a valid head chain to download, it will be re-established once the status has been returned. 

This should assist with getting nodes to sync on medalla faster.
2020-08-18 05:22:34 +00:00
Age Manning
3bb30754d9 Keep track of failed head chains and prevent re-lookups (#1534)
## Overview

There are forked chains which get referenced by blocks and attestations on a network. Typically if these chains are very long, we stop looking up the chain and downvote the peer. In extreme circumstances, many peers are on many chains, the chains can be very deep and become time consuming performing lookups. 

This PR adds a cache to known failed chain lookups. This prevents us from starting a parent-lookup (or stopping one half way through) if we have attempted the chain lookup in the past.
2020-08-18 03:54:09 +00:00
Age Manning
cc44a64d15 Limit parallelism of head chain sync (#1527)
## Description

Currently lighthouse load-balances across peers a single finalized chain. The chain is selected via the most peers. Once synced to the latest finalized epoch Lighthouse creates chains amongst its peers and syncs them all in parallel amongst each peer (grouped by their current head block). 

This is typically fast and relatively efficient under normal operations. However if the chain has not finalized in a long time, the head chains can grow quite long. Peer's head chains will update every slot as new blocks are added to the head. Syncing all head chains in parallel is a bottleneck and highly inefficient in block duplication leads to RPC timeouts when attempting to handle all new heads chains at once. 

This PR limits the parallelism of head syncing chains to 2. We now sync at most two head chains at a time. This allows for the possiblity of sync progressing alongside a peer being slow and holding up one chain via RPC timeouts.
2020-08-18 02:49:24 +00:00
divma
46dbf027af Do not reset batch ids & redownload out of range batches (#1528)
The changes are somewhat simple but should solve two issues:
- When quickly changing between chains once and a second time back again, batchIds would collide and cause havoc. 
- If we got an out of range response from a peer, sync would remain in syncing but without advancing

Changes:
- remove the batch id. Identify each batch (inside a chain) by its starting epoch. Target epochs for downloading and processing now advance by EPOCHS_PER_BATCH
- for the same reason, move the "to_be_downloaded_id" to be an epoch
- remove a sneaky line that dropped an out of range batch without downloading it
- bonus: put the chain_id in the log given to the chain. This is why explicitly logging the chain_id is removed
2020-08-18 01:29:51 +00:00
35 changed files with 827 additions and 346 deletions

166
Cargo.lock generated
View File

@@ -2,7 +2,7 @@
# It is not intended for manual editing.
[[package]]
name = "account_manager"
version = "0.2.3"
version = "0.2.5"
dependencies = [
"account_utils",
"bls",
@@ -77,7 +77,7 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fc95d1bdb8e6666b2b217308eeeb09f2d6728d104be3e31916cc74d15420331"
dependencies = [
"generic-array 0.14.3",
"generic-array 0.14.4",
]
[[package]]
@@ -361,7 +361,7 @@ dependencies = [
"slog-term",
"sloggers",
"slot_clock",
"smallvec 1.4.1",
"smallvec 1.4.2",
"state_processing",
"store",
"tempfile",
@@ -373,7 +373,7 @@ dependencies = [
[[package]]
name = "beacon_node"
version = "0.2.3"
version = "0.2.5"
dependencies = [
"beacon_chain",
"clap",
@@ -478,7 +478,7 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array 0.14.3",
"generic-array 0.14.4",
]
[[package]]
@@ -487,7 +487,7 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa136449e765dc7faa244561ccae839c394048667929af599b5d931ebe7b7f10"
dependencies = [
"generic-array 0.14.3",
"generic-array 0.14.4",
]
[[package]]
@@ -530,7 +530,7 @@ dependencies = [
[[package]]
name = "boot_node"
version = "0.2.3"
version = "0.2.5"
dependencies = [
"clap",
"discv5",
@@ -635,7 +635,7 @@ dependencies = [
"ethereum-types",
"quickcheck",
"quickcheck_macros",
"smallvec 1.4.1",
"smallvec 1.4.2",
"tree_hash",
]
@@ -685,9 +685,9 @@ dependencies = [
[[package]]
name = "chrono"
version = "0.4.13"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c74d84029116787153e02106bf53e66828452a4b325cc8652b788b5967c0a0b6"
checksum = "942f72db697d8767c22d46a598e01f2d3b475501ea43d0db4f16d90259182d0b"
dependencies = [
"num-integer",
"num-traits",
@@ -696,9 +696,9 @@ dependencies = [
[[package]]
name = "clap"
version = "2.33.2"
version = "2.33.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10040cdf04294b565d9e0319955430099ec3813a64c952b86a41200ad714ae48"
checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
dependencies = [
"ansi_term",
"atty",
@@ -1011,7 +1011,7 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b584a330336237c1eecd3e94266efb216c56ed91225d634cb2991c5f3fd1aeab"
dependencies = [
"generic-array 0.14.3",
"generic-array 0.14.4",
"subtle 2.2.3",
]
@@ -1091,9 +1091,9 @@ dependencies = [
[[package]]
name = "data-encoding"
version = "2.2.1"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72aa14c04dfae8dd7d8a2b1cb7ca2152618cd01336dbfe704b8dcbf8d41dbd69"
checksum = "d4d0e2d24e5ee3b23a01de38eefdcd978907890701f08ffffd4cb457ca4ee8d6"
[[package]]
name = "db-key"
@@ -1163,7 +1163,7 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array 0.14.3",
"generic-array 0.14.4",
]
[[package]]
@@ -1218,7 +1218,7 @@ dependencies = [
"rand 0.7.3",
"rlp",
"sha2 0.8.2",
"smallvec 1.4.1",
"smallvec 1.4.2",
"tokio 0.2.22",
"uint",
"zeroize",
@@ -1284,9 +1284,9 @@ dependencies = [
[[package]]
name = "either"
version = "1.5.3"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3"
checksum = "cd56b59865bce947ac5958779cfa508f6c3b9497cc762b7e24a12d11ccde2c4f"
[[package]]
name = "encoding_rs"
@@ -1299,9 +1299,9 @@ dependencies = [
[[package]]
name = "enr"
version = "0.1.2"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c78d64a14865c080072c05ffb2c11aab15963d5e763ca4dbc02865dc1b615ee"
checksum = "a3137b4854534673ea350751670c6fe53920394a328ba9ce4d9acabd4f60a586"
dependencies = [
"base64 0.12.3",
"bs58",
@@ -1504,7 +1504,7 @@ dependencies = [
"slog-async",
"slog-stdlog",
"slog-term",
"smallvec 1.4.1",
"smallvec 1.4.2",
"snap",
"tempdir",
"tiny-keccak 2.0.2",
@@ -1522,7 +1522,7 @@ version = "0.1.2"
dependencies = [
"eth2_ssz_derive",
"ethereum-types",
"smallvec 1.4.1",
"smallvec 1.4.2",
]
[[package]]
@@ -1916,9 +1916,9 @@ dependencies = [
[[package]]
name = "generic-array"
version = "0.14.3"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60fb4bb6bba52f78a471264d9a3b7d026cc0af47b22cd2cffbc0b787ca003e63"
checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817"
dependencies = [
"typenum",
"version_check 0.9.2",
@@ -2069,14 +2069,14 @@ checksum = "d36fab90f82edc3c747f9d438e06cf0a491055896f2a279638bb5beed6c40177"
[[package]]
name = "handlebars"
version = "3.3.0"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86dbc8a0746b08f363d2e00da48e6c9ceb75c198ac692d2715fcbb5bee74c87d"
checksum = "5deefd4816fb852b1ff3cb48f6c41da67be2d0e1d20b26a7a3b076da11f064b1"
dependencies = [
"log 0.4.11",
"pest",
"pest_derive",
"quick-error",
"quick-error 2.0.0",
"serde",
"serde_json",
]
@@ -2093,9 +2093,9 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.8.1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34f595585f103464d8d2f6e9864682d74c1601fed5e07d62b1c9058dba8246fb"
checksum = "e91b62f79061a0bc2e046024cb7ba44b08419ed238ecbd9adbd787434b9e8c25"
dependencies = [
"autocfg 1.0.0",
]
@@ -2241,7 +2241,7 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
dependencies = [
"quick-error",
"quick-error 1.2.3",
]
[[package]]
@@ -2399,7 +2399,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86b45e59b16c76b11bf9738fd5d38879d3bd28ad292d7b313608becb17ae2df9"
dependencies = [
"autocfg 1.0.0",
"hashbrown 0.8.1",
"hashbrown 0.8.2",
]
[[package]]
@@ -2517,13 +2517,13 @@ dependencies = [
[[package]]
name = "lazycell"
version = "1.2.1"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b294d6fa9ee409a054354afc4352b0b9ef7ca222c69b8812cbea9e7d2bf3783f"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lcli"
version = "0.2.2"
version = "0.2.5"
dependencies = [
"bls",
"clap",
@@ -2627,7 +2627,7 @@ dependencies = [
"parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803)",
"parking_lot 0.10.2",
"pin-project",
"smallvec 1.4.1",
"smallvec 1.4.2",
"wasm-timer",
]
@@ -2658,7 +2658,7 @@ dependencies = [
"ring",
"rw-stream-sink",
"sha2 0.8.2",
"smallvec 1.4.1",
"smallvec 1.4.2",
"thiserror",
"unsigned-varint 0.4.0",
"void",
@@ -2691,7 +2691,7 @@ dependencies = [
"ring",
"rw-stream-sink",
"sha2 0.8.2",
"smallvec 1.4.1",
"smallvec 1.4.2",
"thiserror",
"unsigned-varint 0.4.0",
"void",
@@ -2736,7 +2736,7 @@ dependencies = [
"prost-build",
"rand 0.7.3",
"sha2 0.8.2",
"smallvec 1.4.1",
"smallvec 1.4.2",
"unsigned-varint 0.4.0",
"wasm-timer",
]
@@ -2752,7 +2752,7 @@ dependencies = [
"log 0.4.11",
"prost",
"prost-build",
"smallvec 1.4.1",
"smallvec 1.4.2",
"wasm-timer",
]
@@ -2801,7 +2801,7 @@ dependencies = [
"libp2p-core 0.21.0",
"log 0.4.11",
"rand 0.7.3",
"smallvec 1.4.1",
"smallvec 1.4.2",
"void",
"wasm-timer",
]
@@ -2869,9 +2869,9 @@ dependencies = [
[[package]]
name = "libz-sys"
version = "1.0.25"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb5e43362e38e2bca2fd5f5134c4d4564a23a5c28e9b95411652021a8675ebe"
checksum = "af67924b8dd885cccea261866c8ce5b74d239d272e154053ff927dae839f5ae9"
dependencies = [
"cc",
"libc",
@@ -2881,7 +2881,7 @@ dependencies = [
[[package]]
name = "lighthouse"
version = "0.2.3"
version = "0.2.5"
dependencies = [
"account_manager",
"account_utils",
@@ -2993,6 +2993,13 @@ dependencies = [
"linked-hash-map",
]
[[package]]
name = "lru_cache"
version = "0.1.0"
dependencies = [
"fnv",
]
[[package]]
name = "lru_time_cache"
version = "0.10.0"
@@ -3205,7 +3212,7 @@ dependencies = [
"futures 0.3.5",
"log 0.4.11",
"pin-project",
"smallvec 1.4.1",
"smallvec 1.4.2",
"unsigned-varint 0.4.0",
]
@@ -3219,7 +3226,7 @@ dependencies = [
"futures 0.3.5",
"log 0.4.11",
"pin-project",
"smallvec 1.4.1",
"smallvec 1.4.2",
"unsigned-varint 0.4.0",
]
@@ -3271,6 +3278,7 @@ dependencies = [
"itertools 0.9.0",
"lazy_static",
"lighthouse_metrics",
"lru_cache",
"matches",
"num_cpus",
"parking_lot 0.11.0",
@@ -3280,7 +3288,7 @@ dependencies = [
"slog",
"sloggers",
"slot_clock",
"smallvec 1.4.1",
"smallvec 1.4.2",
"state_processing",
"store",
"tempfile",
@@ -3353,7 +3361,7 @@ dependencies = [
"num-traits",
"rand 0.7.3",
"serde",
"smallvec 1.4.1",
"smallvec 1.4.2",
"zeroize",
]
@@ -3405,11 +3413,11 @@ checksum = "1ab52be62400ca80aa00285d25253d7f7c437b7375c4de678f5405d3afe82ca5"
[[package]]
name = "once_cell"
version = "1.4.0"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"
checksum = "260e51e7efe62b592207e9e13a68e43692a7a279171d6ba57abd208bf23645ad"
dependencies = [
"parking_lot 0.10.2",
"parking_lot 0.11.0",
]
[[package]]
@@ -3593,7 +3601,7 @@ dependencies = [
"cloudabi 0.0.3",
"libc",
"redox_syscall",
"smallvec 1.4.1",
"smallvec 1.4.2",
"winapi 0.3.9",
]
@@ -3608,7 +3616,7 @@ dependencies = [
"instant",
"libc",
"redox_syscall",
"smallvec 1.4.1",
"smallvec 1.4.2",
"winapi 0.3.9",
]
@@ -3902,9 +3910,9 @@ dependencies = [
[[package]]
name = "protobuf"
version = "2.16.2"
version = "2.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d883f78645c21b7281d21305181aa1f4dd9e9363e7cf2566c93121552cff003e"
checksum = "cb14183cc7f213ee2410067e1ceeadba2a7478a59432ff0747a335202798b1e2"
[[package]]
name = "psutil"
@@ -3931,6 +3939,12 @@ version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quick-error"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ac73b1112776fc109b2e61909bc46c7e1bf0d7f690ffb1676553acce16d5cda"
[[package]]
name = "quickcheck"
version = "0.9.2"
@@ -4346,7 +4360,7 @@ dependencies = [
"libsqlite3-sys",
"lru-cache",
"memchr",
"smallvec 1.4.1",
"smallvec 1.4.2",
"time 0.1.43",
]
@@ -4391,9 +4405,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.18.0"
version = "0.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cac94b333ee2aac3284c5b8a1b7fb4dd11cba88c244e3fe33cdbd047af0eb693"
checksum = "5d1126dcf58e93cee7d098dbda643b5f92ed724f1f6a63007c1116eed6700c81"
dependencies = [
"base64 0.12.3",
"log 0.4.11",
@@ -4560,9 +4574,9 @@ checksum = "a0eddf2e8f50ced781f288c19f18621fa72a3779e3cb58dbf23b07469b0abeb4"
[[package]]
name = "serde"
version = "1.0.114"
version = "1.0.115"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5317f7588f0a5078ee60ef675ef96735a1442132dc645eb1d12c018620ed8cd3"
checksum = "e54c9a88f2da7238af84b5101443f0c0d0a3bbdc455e34a5c9497b1903ed55d5"
dependencies = [
"serde_derive",
]
@@ -4579,9 +4593,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.114"
version = "1.0.115"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a0be94b04690fbaed37cddffc5c134bf537c8e3329d53e982fe04c374978f8e"
checksum = "609feed1d0a73cc36a0182a840a9b37b4a82f0b1150369f0536a9e3f2a31dc48"
dependencies = [
"proc-macro2",
"quote",
@@ -4885,9 +4899,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.4.1"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3757cb9d89161a2f24e1cf78efa0c1fcff485d18e3f55e0aa3480824ddaa0f3f"
checksum = "fbee7696b84bbf3d89a1c2eccff0850e3047ed46bfcd2e92c29a2d074d57e252"
[[package]]
name = "snafu"
@@ -5100,7 +5114,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09f8ed9974042b8c3672ff3030a69fcc03b74c47c3d1ecb7755e8a3626011e88"
dependencies = [
"block-cipher",
"generic-array 0.14.3",
"generic-array 0.14.4",
]
[[package]]
@@ -5380,9 +5394,9 @@ dependencies = [
[[package]]
name = "tinyvec"
version = "0.3.3"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53953d2d3a5ad81d9f844a32f14ebb121f50b650cd59d0ee2a07cf13c617efed"
checksum = "238ce071d267c5710f9d31451efec16c5ee22de34df17cc05e56cbc92e967117"
[[package]]
name = "tokio"
@@ -5721,9 +5735,9 @@ checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860"
[[package]]
name = "tracing"
version = "0.1.18"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0aae59226cf195d8e74d4b34beae1859257efb4e5fed3f147d2dc2c7d372178"
checksum = "6d79ca061b032d6ce30c660fded31189ca0b9922bf483cd70759f13a2d86786c"
dependencies = [
"cfg-if",
"log 0.4.11",
@@ -5732,9 +5746,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.13"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d593f98af59ebc017c0648f0117525db358745a8894a8d684e185ba3f45954f9"
checksum = "db63662723c316b43ca36d833707cc93dff82a02ba3d7e354f342682cc8b3545"
dependencies = [
"lazy_static",
]
@@ -5773,7 +5787,7 @@ dependencies = [
"ethereum-types",
"lazy_static",
"rand 0.7.3",
"smallvec 1.4.1",
"smallvec 1.4.2",
"tree_hash_derive",
"types",
]
@@ -5857,9 +5871,9 @@ checksum = "c6ff93345ba2206230b1bb1aa3ece1a63dd9443b7531024575d16a0680a59444"
[[package]]
name = "uint"
version = "0.8.4"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "429ffcad8c8c15f874578c7337d156a3727eb4a1c2374c0ae937ad9a9b748c80"
checksum = "9db035e67dfaf7edd9aebfe8676afcd63eed53c8a4044fed514c8cccf1835177"
dependencies = [
"arbitrary",
"byteorder",
@@ -5934,7 +5948,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8326b2c654932e3e4f9196e69d08fdf7cfd718e1dc6f66b347e6024a0c961402"
dependencies = [
"generic-array 0.14.3",
"generic-array 0.14.4",
"subtle 2.2.3",
]
@@ -6003,7 +6017,7 @@ dependencies = [
[[package]]
name = "validator_client"
version = "0.2.3"
version = "0.2.5"
dependencies = [
"account_utils",
"bls",

View File

@@ -28,6 +28,7 @@ members = [
"common/lighthouse_metrics",
"common/lighthouse_version",
"common/logging",
"common/lru_cache",
"common/remote_beacon_node",
"common/rest_types",
"common/slot_clock",

View File

@@ -1,6 +1,6 @@
[package]
name = "account_manager"
version = "0.2.4"
version = "0.2.5"
authors = ["Paul Hauner <paul@paulhauner.com>", "Luke Anderson <luke@sigmaprime.io>"]
edition = "2018"

View File

@@ -1,6 +1,6 @@
[package]
name = "beacon_node"
version = "0.2.4"
version = "0.2.5"
authors = ["Paul Hauner <paul@paulhauner.com>", "Age Manning <Age@AgeManning.com"]
edition = "2018"

View File

@@ -549,7 +549,7 @@ fn verify_head_block_is_known<T: BeaconChainTypes>(
{
// Reject any block that exceeds our limit on skipped slots.
if let Some(max_skip_slots) = max_skip_slots {
if block.slot > attestation.data.slot + max_skip_slots {
if attestation.data.slot > block.slot + max_skip_slots {
return Err(Error::TooManySkippedSlots {
head_block_slot: block.slot,
attestation_slot: attestation.data.slot,

View File

@@ -431,6 +431,9 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
let (mut parent, block) = load_parent(block, chain)?;
// Reject any block that exceeds our limit on skipped slots.
check_block_skip_slots(chain, &parent.beacon_block.message, &block.message)?;
let state = cheap_state_advance_to_obtain_committees(
&mut parent.beacon_state,
block.slot(),
@@ -517,6 +520,10 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
chain: &BeaconChain<T>,
) -> Result<Self, BlockError<T::EthSpec>> {
let (mut parent, block) = load_parent(block, chain)?;
// Reject any block that exceeds our limit on skipped slots.
check_block_skip_slots(chain, &parent.beacon_block.message, &block.message)?;
let block_root = get_block_root(&block);
let state = cheap_state_advance_to_obtain_committees(
@@ -648,14 +655,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
}
// Reject any block that exceeds our limit on skipped slots.
if let Some(max_skip_slots) = chain.config.import_max_skip_slots {
if block.slot() > parent.beacon_block.slot() + max_skip_slots {
return Err(BlockError::TooManySkippedSlots {
parent_slot: parent.beacon_block.slot(),
block_slot: block.slot(),
});
}
}
check_block_skip_slots(chain, &parent.beacon_block.message, &block.message)?;
/*
* Perform cursory checks to see if the block is even worth processing.
@@ -799,6 +799,30 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
}
}
/// Check that the count of skip slots between the block and its parent does not exceed our maximum
/// value.
///
/// Whilst this is not part of the specification, we include this to help prevent us from DoS
/// attacks. In times of dire network circumstance, the user can configure the
/// `import_max_skip_slots` value.
fn check_block_skip_slots<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
parent: &BeaconBlock<T::EthSpec>,
block: &BeaconBlock<T::EthSpec>,
) -> Result<(), BlockError<T::EthSpec>> {
// Reject any block that exceeds our limit on skipped slots.
if let Some(max_skip_slots) = chain.config.import_max_skip_slots {
if block.slot > parent.slot + max_skip_slots {
return Err(BlockError::TooManySkippedSlots {
parent_slot: parent.slot,
block_slot: block.slot,
});
}
}
Ok(())
}
/// Returns `Ok(())` if the block is later than the finalized slot on `chain`.
///
/// Returns an error if the block is earlier or equal to the finalized slot, or there was an error

View File

@@ -1,6 +1,7 @@
use serde_derive::{Deserialize, Serialize};
pub const DEFAULT_IMPORT_BLOCK_MAX_SKIP_SLOTS: u64 = 10 * 32;
/// There is a 693 block skip in the current canonical Medalla chain, we use 700 to be safe.
pub const DEFAULT_IMPORT_BLOCK_MAX_SKIP_SLOTS: u64 = 700;
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct ChainConfig {

View File

@@ -122,14 +122,28 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
head_distance.as_u64(),
slot_distance_pretty(head_distance, slot_duration)
);
info!(
log,
"Syncing";
"peers" => peer_count_pretty(connected_peer_count),
"distance" => distance,
"speed" => sync_speed_pretty(speedo.slots_per_second()),
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(current_slot)),
);
let speed = speedo.slots_per_second();
let display_speed = speed.map_or(false, |speed| speed != 0.0);
if display_speed {
info!(
log,
"Syncing";
"peers" => peer_count_pretty(connected_peer_count),
"distance" => distance,
"speed" => sync_speed_pretty(speed),
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(current_slot)),
);
} else {
info!(
log,
"Syncing";
"peers" => peer_count_pretty(connected_peer_count),
"distance" => distance,
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(current_slot)),
);
}
} else if sync_state.is_synced() {
let block_info = if current_slot > head_slot {
" … empty".to_string()
@@ -220,14 +234,37 @@ fn seconds_pretty(secs: f64) -> String {
let hours = d.whole_hours();
let minutes = d.whole_minutes();
let week_string = if weeks == 1 { "week" } else { "weeks" };
let day_string = if days == 1 { "day" } else { "days" };
let hour_string = if hours == 1 { "hr" } else { "hrs" };
let min_string = if minutes == 1 { "min" } else { "mins" };
if weeks > 0 {
format!("{:.0} weeks {:.0} days", weeks, days % DAYS_PER_WEEK)
format!(
"{:.0} {} {:.0} {}",
weeks,
week_string,
days % DAYS_PER_WEEK,
day_string
)
} else if days > 0 {
format!("{:.0} days {:.0} hrs", days, hours % HOURS_PER_DAY)
format!(
"{:.0} {} {:.0} {}",
days,
day_string,
hours % HOURS_PER_DAY,
hour_string
)
} else if hours > 0 {
format!("{:.0} hrs {:.0} mins", hours, minutes % MINUTES_PER_HOUR)
format!(
"{:.0} {} {:.0} {}",
hours,
hour_string,
minutes % MINUTES_PER_HOUR,
min_string
)
} else {
format!("{:.0} mins", minutes)
format!("{:.0} {}", minutes, min_string)
}
}

View File

@@ -30,6 +30,8 @@ pub enum ClientKind {
Teku,
/// A Prysm node.
Prysm,
/// A lodestar node.
Lodestar,
/// An unknown client.
Unknown,
}
@@ -84,6 +86,7 @@ impl std::fmt::Display for Client {
"Prysm: version: {}, os_version: {}",
self.version, self.os_version
),
ClientKind::Lodestar => write!(f, "Lodestar: version: {}", self.version),
ClientKind::Unknown => {
if let Some(agent_string) = &self.agent_string {
write!(f, "Unknown: {}", agent_string)
@@ -157,6 +160,18 @@ fn client_from_agent_version(agent_version: &str) -> (ClientKind, String, String
}
(kind, version, os_version)
}
Some("js-libp2p") => {
let kind = ClientKind::Lodestar;
let mut version = String::from("unknown");
let mut os_version = version.clone();
if let Some(agent_version) = agent_split.next() {
version = agent_version.into();
if let Some(agent_os_version) = agent_split.next() {
os_version = agent_os_version.into();
}
}
(kind, version, os_version)
}
_ => {
let unknown = String::from("unknown");
(ClientKind::Unknown, unknown.clone(), unknown)

View File

@@ -40,3 +40,4 @@ lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
environment = { path = "../../lighthouse/environment" }
itertools = "0.9.0"
num_cpus = "1.13.0"
lru_cache = { path = "../../common/lru_cache" }

View File

@@ -1,21 +1,21 @@
use crate::metrics;
use crate::router::processor::FUTURE_SLOT_TOLERANCE;
use crate::sync::manager::SyncMessage;
use crate::sync::{BatchId, BatchProcessResult, ChainId};
use crate::sync::{BatchProcessResult, ChainId};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, ChainSegmentResult};
use eth2_libp2p::PeerId;
use slog::{debug, error, trace, warn};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{EthSpec, SignedBeaconBlock};
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock};
/// Id associated to a block processing request, either a batch or a single block.
#[derive(Clone, Debug, PartialEq)]
pub enum ProcessId {
/// Processing Id of a range syncing batch.
RangeBatchId(ChainId, BatchId),
/// Processing Id of the parent lookup of a block
ParentLookup(PeerId),
RangeBatchId(ChainId, Epoch),
/// Processing Id of the parent lookup of a block.
ParentLookup(PeerId, Hash256),
}
pub fn handle_chain_segment<T: BeaconChainTypes>(
@@ -27,7 +27,7 @@ pub fn handle_chain_segment<T: BeaconChainTypes>(
) {
match process_id {
// this a request from the range sync
ProcessId::RangeBatchId(chain_id, batch_id) => {
ProcessId::RangeBatchId(chain_id, epoch) => {
let len = downloaded_blocks.len();
let start_slot = if len > 0 {
downloaded_blocks[0].message.slot.as_u64()
@@ -40,26 +40,26 @@ pub fn handle_chain_segment<T: BeaconChainTypes>(
0
};
debug!(log, "Processing batch"; "id" => *batch_id, "blocks" => downloaded_blocks.len(), "start_slot" => start_slot, "end_slot" => end_slot);
debug!(log, "Processing batch"; "batch_epoch" => epoch, "blocks" => downloaded_blocks.len(), "first_block_slot" => start_slot, "last_block_slot" => end_slot, "service" => "sync");
let result = match process_blocks(chain, downloaded_blocks.iter(), &log) {
(_, Ok(_)) => {
debug!(log, "Batch processed"; "id" => *batch_id , "start_slot" => start_slot, "end_slot" => end_slot);
debug!(log, "Batch processed"; "batch_epoch" => epoch , "first_block_slot" => start_slot, "last_block_slot" => end_slot, "service"=> "sync");
BatchProcessResult::Success
}
(imported_blocks, Err(e)) if imported_blocks > 0 => {
debug!(log, "Batch processing failed but imported some blocks";
"id" => *batch_id, "error" => e, "imported_blocks"=> imported_blocks);
"batch_epoch" => epoch, "error" => e, "imported_blocks"=> imported_blocks, "service" => "sync");
BatchProcessResult::Partial
}
(_, Err(e)) => {
debug!(log, "Batch processing failed"; "id" => *batch_id, "error" => e);
debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "error" => e, "service" => "sync");
BatchProcessResult::Failed
}
};
let msg = SyncMessage::BatchProcessed {
chain_id,
batch_id,
epoch,
downloaded_blocks,
result,
};
@@ -71,7 +71,7 @@ pub fn handle_chain_segment<T: BeaconChainTypes>(
});
}
// this a parent lookup request from the sync manager
ProcessId::ParentLookup(peer_id) => {
ProcessId::ParentLookup(peer_id, chain_head) => {
debug!(
log, "Processing parent lookup";
"last_peer_id" => format!("{}", peer_id),
@@ -81,9 +81,9 @@ pub fn handle_chain_segment<T: BeaconChainTypes>(
// reverse
match process_blocks(chain, downloaded_blocks.iter().rev(), &log) {
(_, Err(e)) => {
warn!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e);
debug!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e);
sync_send
.send(SyncMessage::ParentLookupFailed(peer_id))
.send(SyncMessage::ParentLookupFailed{peer_id, chain_head})
.unwrap_or_else(|_| {
// on failure, inform to downvote the peer
debug!(

View File

@@ -774,6 +774,13 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
);
return;
}
Err(BlockError::BlockIsAlreadyKnown) => {
debug!(
log,
"Gossip block is already known";
);
return;
}
Err(e) => {
warn!(
log,

View File

@@ -35,7 +35,7 @@
use super::network_context::SyncNetworkContext;
use super::peer_sync_info::{PeerSyncInfo, PeerSyncType};
use super::range_sync::{BatchId, ChainId, RangeSync, EPOCHS_PER_BATCH};
use super::range_sync::{ChainId, RangeSync, EPOCHS_PER_BATCH};
use super::RequestId;
use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent};
use crate::service::NetworkMessage;
@@ -44,6 +44,7 @@ use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, Goodbye
use eth2_libp2p::types::NetworkGlobals;
use eth2_libp2p::{PeerAction, PeerId};
use fnv::FnvHashMap;
use lru_cache::LRUCache;
use slog::{crit, debug, error, info, trace, warn, Logger};
use smallvec::SmallVec;
use ssz_types::VariableList;
@@ -51,7 +52,7 @@ use std::boxed::Box;
use std::ops::Sub;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{EthSpec, Hash256, SignedBeaconBlock, Slot};
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a
@@ -100,13 +101,18 @@ pub enum SyncMessage<T: EthSpec> {
/// A batch has been processed by the block processor thread.
BatchProcessed {
chain_id: ChainId,
batch_id: BatchId,
epoch: Epoch,
downloaded_blocks: Vec<SignedBeaconBlock<T>>,
result: BatchProcessResult,
},
/// A parent lookup has failed for a block given by this `peer_id`.
ParentLookupFailed(PeerId),
/// A parent lookup has failed.
ParentLookupFailed {
/// The head of the chain of blocks that failed to process.
chain_head: Hash256,
/// The peer that instigated the chain lookup.
peer_id: PeerId,
},
}
/// The result of processing a multiple blocks (a chain segment).
@@ -161,6 +167,9 @@ pub struct SyncManager<T: BeaconChainTypes> {
/// A collection of parent block lookups.
parent_queue: SmallVec<[ParentRequests<T::EthSpec>; 3]>,
/// A cache of failed chain lookups to prevent duplicate searches.
failed_chains: LRUCache<Hash256>,
/// A collection of block hashes being searched for and a flag indicating if a result has been
/// received or not.
///
@@ -222,6 +231,7 @@ pub fn spawn<T: BeaconChainTypes>(
network_globals,
input_channel: sync_recv,
parent_queue: SmallVec::new(),
failed_chains: LRUCache::new(500),
single_block_lookups: FnvHashMap::default(),
log: log.clone(),
beacon_processor_send,
@@ -351,6 +361,22 @@ impl<T: BeaconChainTypes> SyncManager<T> {
return;
}
};
// check if the parent of this block isn't in our failed cache. If it is, this
// chain should be dropped and the peer downscored.
if self.failed_chains.contains(&block.message.parent_root) {
debug!(self.log, "Parent chain ignored due to past failure"; "block" => format!("{:?}", block.message.parent_root), "slot" => block.message.slot);
if !parent_request.downloaded_blocks.is_empty() {
// Add the root block to failed chains
self.failed_chains
.insert(parent_request.downloaded_blocks[0].canonical_root());
} else {
crit!(self.log, "Parent chain has no blocks");
}
self.network
.report_peer(peer_id, PeerAction::MidToleranceError);
return;
}
// add the block to response
parent_request.downloaded_blocks.push(block);
// queue for processing
@@ -510,6 +536,15 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
let block_root = block.canonical_root();
// If this block or it's parent is part of a known failed chain, ignore it.
if self.failed_chains.contains(&block.message.parent_root)
|| self.failed_chains.contains(&block_root)
{
debug!(self.log, "Block is from a past failed chain. Dropping"; "block_root" => format!("{:?}", block_root), "block_slot" => block.message.slot);
return;
}
// Make sure this block is not already being searched for
// NOTE: Potentially store a hashset of blocks for O(1) lookups
for parent_req in self.parent_queue.iter() {
@@ -697,6 +732,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// If the last block in the queue has an unknown parent, we continue the parent
// lookup-search.
let chain_block_hash = parent_request.downloaded_blocks[0].canonical_root();
let newest_block = parent_request
.downloaded_blocks
.pop()
@@ -715,8 +752,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.request_parent(parent_request);
}
Ok(_) | Err(BlockError::BlockIsAlreadyKnown { .. }) => {
let process_id =
ProcessId::ParentLookup(parent_request.last_submitted_peer.clone());
let process_id = ProcessId::ParentLookup(
parent_request.last_submitted_peer.clone(),
chain_block_hash,
);
let blocks = parent_request.downloaded_blocks;
match self
@@ -742,6 +781,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"outcome" => format!("{:?}", outcome),
"last_peer" => parent_request.last_submitted_peer.to_string(),
);
// Add this chain to cache of failed chains
self.failed_chains.insert(chain_block_hash);
// This currently can be a host of errors. We permit this due to the partial
// ambiguity.
// TODO: Refine the error types and score the peer appropriately.
@@ -764,8 +807,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|| parent_request.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE
{
let error = if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE {
// This is a peer-specific error and the chain could be continued with another
// peer. We don't consider this chain a failure and prevent retries with another
// peer.
"too many failed attempts"
} else {
if !parent_request.downloaded_blocks.is_empty() {
self.failed_chains
.insert(parent_request.downloaded_blocks[0].canonical_root());
} else {
crit!(self.log, "Parent lookup has no blocks");
}
"reached maximum lookup-depth"
};
@@ -774,6 +826,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"ancestors_found" => parent_request.downloaded_blocks.len(),
"reason" => error
);
// Downscore the peer.
self.network.report_peer(
parent_request.last_submitted_peer,
PeerAction::LowToleranceError,
);
return; // drop the request
}
@@ -842,24 +899,25 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
SyncMessage::BatchProcessed {
chain_id,
batch_id,
epoch,
downloaded_blocks,
result,
} => {
self.range_sync.handle_block_process_result(
&mut self.network,
chain_id,
batch_id,
epoch,
downloaded_blocks,
result,
);
}
SyncMessage::ParentLookupFailed(peer_id) => {
SyncMessage::ParentLookupFailed {
chain_head,
peer_id,
} => {
// A peer sent an object (block or attestation) that referenced a parent.
// On request for this parent the peer indicated it did not have this
// block.
// This is not fatal. Peer's could prune old blocks so we moderately
// tolerate this behaviour.
// The processing of this chain failed.
self.failed_chains.insert(chain_head);
self.network
.report_peer(peer_id, PeerAction::MidToleranceError);
}

View File

@@ -8,7 +8,7 @@ mod range_sync;
pub use manager::{BatchProcessResult, SyncMessage};
pub use peer_sync_info::PeerSyncInfo;
pub use range_sync::{BatchId, ChainId};
pub use range_sync::ChainId;
/// Type of id of rpc requests sent by sync
pub type RequestId = usize;

View File

@@ -9,37 +9,14 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::ops::Sub;
use types::{EthSpec, SignedBeaconBlock, Slot};
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct BatchId(pub u64);
impl std::ops::Deref for BatchId {
type Target = u64;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for BatchId {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl std::convert::From<u64> for BatchId {
fn from(id: u64) -> Self {
BatchId(id)
}
}
use types::{Epoch, EthSpec, SignedBeaconBlock, Slot};
/// A collection of sequential blocks that are requested from peers in a single RPC request.
#[derive(PartialEq, Debug)]
pub struct Batch<T: EthSpec> {
/// The ID of the batch, these are sequential.
pub id: BatchId,
/// The requested start slot of the batch, inclusive.
pub start_slot: Slot,
/// The requested end slot of batch, exlcusive.
/// The requested start epoch of the batch.
pub start_epoch: Epoch,
/// The requested end slot of batch, exclusive.
pub end_slot: Slot,
/// The `Attempts` that have been made to send us this batch.
pub attempts: Vec<Attempt>,
@@ -69,10 +46,9 @@ pub struct Attempt {
impl<T: EthSpec> Eq for Batch<T> {}
impl<T: EthSpec> Batch<T> {
pub fn new(id: BatchId, start_slot: Slot, end_slot: Slot, peer_id: PeerId) -> Self {
pub fn new(start_epoch: Epoch, end_slot: Slot, peer_id: PeerId) -> Self {
Batch {
id,
start_slot,
start_epoch,
end_slot,
attempts: Vec::new(),
current_peer: peer_id,
@@ -82,12 +58,21 @@ impl<T: EthSpec> Batch<T> {
}
}
pub fn start_slot(&self) -> Slot {
// batches are shifted by 1
self.start_epoch.start_slot(T::slots_per_epoch()) + 1
}
pub fn end_slot(&self) -> Slot {
self.end_slot
}
pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest {
let start_slot = self.start_slot();
BlocksByRangeRequest {
start_slot: self.start_slot.into(),
start_slot: start_slot.into(),
count: min(
T::slots_per_epoch() * EPOCHS_PER_BATCH,
self.end_slot.sub(self.start_slot).into(),
self.end_slot.sub(start_slot).into(),
),
step: 1,
}
@@ -105,7 +90,7 @@ impl<T: EthSpec> Batch<T> {
impl<T: EthSpec> Ord for Batch<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.id.0.cmp(&other.id.0)
self.start_epoch.cmp(&other.start_epoch)
}
}

View File

@@ -1,4 +1,4 @@
use super::batch::{Batch, BatchId, PendingBatches};
use super::batch::{Batch, PendingBatches};
use crate::beacon_processor::ProcessId;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::sync::RequestId;
@@ -73,11 +73,12 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// and thus available to download this chain from.
pub peer_pool: HashSet<PeerId>,
/// The next batch_id that needs to be downloaded.
to_be_downloaded_id: BatchId,
/// Starting epoch of the next batch that needs to be downloaded.
to_be_downloaded: Epoch,
/// The next batch id that needs to be processed.
to_be_processed_id: BatchId,
/// Starting epoch of the batch that needs to be processed next.
/// This is incremented as the chain advances.
processing_target: Epoch,
/// The current state of the chain.
pub state: ChainSyncingState,
@@ -91,7 +92,7 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// A reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>,
/// A reference to the sync logger.
/// The chain's log.
log: slog::Logger,
}
@@ -127,8 +128,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
completed_batches: Vec::new(),
processed_batches: Vec::new(),
peer_pool,
to_be_downloaded_id: BatchId(1),
to_be_processed_id: BatchId(1),
to_be_downloaded: start_epoch,
processing_target: start_epoch,
state: ChainSyncingState::Stopped,
current_processing_batch: None,
beacon_processor_send,
@@ -139,13 +140,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Returns the latest slot number that has been processed.
fn current_processed_slot(&self) -> Slot {
self.start_epoch
// the last slot we processed was included in the previous batch, and corresponds to the
// first slot of the current target epoch
self.processing_target
.start_slot(T::EthSpec::slots_per_epoch())
.saturating_add(
self.to_be_processed_id.saturating_sub(1u64)
* T::EthSpec::slots_per_epoch()
* EPOCHS_PER_BATCH,
)
}
/// A batch of blocks has been received. This function gets run on all chains and should
@@ -182,21 +180,19 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// An entire batch of blocks has been received. This functions checks to see if it can be processed,
// remove any batches waiting to be verified and if this chain is syncing, request new
// blocks for the peer.
debug!(self.log, "Completed batch received"; "id"=> *batch.id, "blocks" => &batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len());
debug!(self.log, "Completed batch received"; "epoch" => batch.start_epoch, "blocks" => &batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len());
// verify the range of received blocks
// Note that the order of blocks is verified in block processing
if let Some(last_slot) = batch.downloaded_blocks.last().map(|b| b.slot()) {
// the batch is non-empty
let first_slot = batch.downloaded_blocks[0].slot();
if batch.start_slot > first_slot || batch.end_slot < last_slot {
if batch.start_slot() > first_slot || batch.end_slot() < last_slot {
warn!(self.log, "BlocksByRange response returned out of range blocks";
"response_initial_slot" => first_slot,
"requested_initial_slot" => batch.start_slot);
// This is a pretty bad error. We don't consider this fatal, but we don't tolerate
// this much either.
network.report_peer(batch.current_peer, PeerAction::LowToleranceError);
self.to_be_processed_id = batch.id; // reset the id back to here, when incrementing, it will check against completed batches
"response_initial_slot" => first_slot,
"requested_initial_slot" => batch.start_slot());
// this batch can't be used, so we need to request it again.
self.failed_batch(network, batch);
return;
}
}
@@ -242,7 +238,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// Check if there is a batch ready to be processed
if !self.completed_batches.is_empty()
&& self.completed_batches[0].id == self.to_be_processed_id
&& self.completed_batches[0].start_epoch == self.processing_target
{
let batch = self.completed_batches.remove(0);
@@ -258,7 +254,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Sends a batch to the beacon processor for async processing in a queue.
fn process_batch(&mut self, mut batch: Batch<T::EthSpec>) {
let blocks = std::mem::replace(&mut batch.downloaded_blocks, Vec::new());
let process_id = ProcessId::RangeBatchId(self.id, batch.id);
let process_id = ProcessId::RangeBatchId(self.id, batch.start_epoch);
self.current_processing_batch = Some(batch);
if let Err(e) = self
@@ -280,7 +276,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
chain_id: ChainId,
batch_id: BatchId,
batch_start_epoch: Epoch,
downloaded_blocks: &mut Option<Vec<SignedBeaconBlock<T::EthSpec>>>,
result: &BatchProcessResult,
) -> Option<ProcessingResult> {
@@ -289,14 +285,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}
match &self.current_processing_batch {
Some(current_batch) if current_batch.id != batch_id => {
Some(current_batch) if current_batch.start_epoch != batch_start_epoch => {
debug!(self.log, "Unexpected batch result";
"chain_id" => self.id, "batch_id" => *batch_id, "expected_batch_id" => *current_batch.id);
"batch_epoch" => batch_start_epoch, "expected_batch_epoch" => current_batch.start_epoch);
return None;
}
None => {
debug!(self.log, "Chain was not expecting a batch result";
"chain_id" => self.id, "batch_id" => *batch_id);
"batch_epoch" => batch_start_epoch);
return None;
}
_ => {
@@ -308,7 +304,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let downloaded_blocks = downloaded_blocks.take().or_else(|| {
// if taken by another chain, we are no longer waiting on a result.
self.current_processing_batch = None;
crit!(self.log, "Processed batch taken by another chain"; "chain_id" => self.id);
crit!(self.log, "Processed batch taken by another chain");
None
})?;
@@ -318,16 +314,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch.downloaded_blocks = downloaded_blocks;
// double check batches are processed in order TODO: Remove for prod
if batch.id != self.to_be_processed_id {
if batch.start_epoch != self.processing_target {
crit!(self.log, "Batch processed out of order";
"chain_id" => self.id,
"processed_batch_id" => *batch.id,
"expected_id" => *self.to_be_processed_id);
"processed_starting_epoch" => batch.start_epoch,
"expected_epoch" => self.processing_target);
}
let res = match result {
BatchProcessResult::Success => {
*self.to_be_processed_id += 1;
self.processing_target += EPOCHS_PER_BATCH;
// If the processed batch was not empty, we can validate previous invalidated
// blocks including the current batch.
@@ -357,7 +352,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
BatchProcessResult::Partial => {
warn!(self.log, "Batch processing failed but at least one block was imported";
"chain_id" => self.id, "id" => *batch.id, "peer" => format!("{}", batch.current_peer)
"batch_epoch" => batch.start_epoch, "peer" => batch.current_peer.to_string()
);
// At least one block was successfully verified and imported, so we can be sure all
// previous batches are valid and we only need to download the current failed
@@ -375,7 +370,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let action = PeerAction::LowToleranceError;
warn!(self.log, "Batch failed to download. Dropping chain scoring peers";
"score_adjustment" => action.to_string(),
"chain_id" => self.id, "id"=> *batch.id);
"batch_epoch"=> batch.start_epoch);
for peer_id in self.peer_pool.drain() {
network.report_peer(peer_id, action);
}
@@ -388,7 +383,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
BatchProcessResult::Failed => {
debug!(self.log, "Batch processing failed";
"chain_id" => self.id,"id" => *batch.id, "peer" => batch.current_peer.to_string(), "client" => network.client_type(&batch.current_peer).to_string());
"batch_epoch" => batch.start_epoch, "peer" => batch.current_peer.to_string(), "client" => network.client_type(&batch.current_peer).to_string());
// The batch processing failed
// This could be because this batch is invalid, or a previous invalidated batch
// is invalid. We need to find out which and downvote the peer that has sent us
@@ -403,7 +398,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let action = PeerAction::LowToleranceError;
warn!(self.log, "Batch failed to download. Dropping chain scoring peers";
"score_adjustment" => action.to_string(),
"chain_id" => self.id, "id"=> *batch.id);
"batch_epoch" => batch.start_epoch);
for peer_id in self.peer_pool.drain() {
network.report_peer(peer_id, action);
}
@@ -433,11 +428,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) {
while !self.processed_batches.is_empty() {
let mut processed_batch = self.processed_batches.remove(0);
if *processed_batch.id >= *last_batch.id {
if processed_batch.start_epoch >= last_batch.start_epoch {
crit!(self.log, "A processed batch had a greater id than the current process id";
"chain_id" => self.id,
"processed_id" => *processed_batch.id,
"current_id" => *last_batch.id);
"processed_start_epoch" => processed_batch.start_epoch,
"current_start_epoch" => last_batch.start_epoch);
}
// Go through passed attempts and downscore peers that returned invalid batches
@@ -452,11 +446,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let action = PeerAction::LowToleranceError;
debug!(
self.log, "Re-processed batch validated. Scoring original peer";
"chain_id" => self.id,
"batch_id" => *processed_batch.id,
"score_adjustment" => action.to_string(),
"original_peer" => format!("{}",attempt.peer_id),
"new_peer" => format!("{}", processed_batch.current_peer)
"batch_epoch" => processed_batch.start_epoch,
"score_adjustment" => action.to_string(),
"original_peer" => format!("{}",attempt.peer_id),
"new_peer" => format!("{}", processed_batch.current_peer)
);
network.report_peer(attempt.peer_id, action);
} else {
@@ -465,11 +458,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let action = PeerAction::MidToleranceError;
debug!(
self.log, "Re-processed batch validated by the same peer.";
"chain_id" => self.id,
"batch_id" => *processed_batch.id,
"score_adjustment" => action.to_string(),
"original_peer" => format!("{}",attempt.peer_id),
"new_peer" => format!("{}", processed_batch.current_peer)
"batch_epoch" => processed_batch.start_epoch,
"score_adjustment" => action.to_string(),
"original_peer" => format!("{}",attempt.peer_id),
"new_peer" => format!("{}", processed_batch.current_peer)
);
network.report_peer(attempt.peer_id, action);
}
@@ -508,7 +500,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// Find any pre-processed batches awaiting validation
while !self.processed_batches.is_empty() {
let past_batch = self.processed_batches.remove(0);
*self.to_be_processed_id = std::cmp::min(*self.to_be_processed_id, *past_batch.id);
self.processing_target = std::cmp::min(self.processing_target, past_batch.start_epoch);
self.reprocess_batch(network, past_batch);
}
@@ -552,11 +544,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch.current_peer = new_peer.clone();
debug!(self.log, "Re-requesting batch";
"chain_id" => self.id,
"start_slot" => batch.start_slot,
"start_slot" => batch.start_slot(),
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"id" => *batch.id,
"peer" => format!("{}", batch.current_peer),
"batch_epoch" => batch.start_epoch,
"peer" => batch.current_peer.to_string(),
"retries" => batch.retries,
"re-processes" => batch.reprocess_retries);
self.send_batch(network, batch);
@@ -592,12 +583,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.start_epoch = local_finalized_epoch;
debug!(self.log, "Updating chain's progress";
"chain_id" => self.id,
"prev_completed_slot" => current_processed_slot,
"new_completed_slot" => self.current_processed_slot());
// Re-index batches
*self.to_be_downloaded_id = 1;
*self.to_be_processed_id = 1;
self.to_be_downloaded = local_finalized_epoch;
self.processing_target = local_finalized_epoch;
// remove any completed or processed batches
self.completed_batches.clear();
@@ -621,7 +611,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// do not request blocks if the chain is not syncing
if let ChainSyncingState::Stopped = self.state {
debug!(self.log, "Peer added to a non-syncing chain";
"chain_id" => self.id, "peer_id" => format!("{}", peer_id));
"peer_id" => format!("{}", peer_id));
return;
}
@@ -650,8 +640,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> Option<ProcessingResult> {
if let Some(batch) = self.pending_batches.remove(request_id) {
debug!(self.log, "Batch failed. RPC Error";
"chain_id" => self.id,
"id" => *batch.id,
"batch_epoch" => batch.start_epoch,
"retries" => batch.retries,
"peer" => format!("{:?}", peer_id));
@@ -688,17 +677,23 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch.current_peer = new_peer.clone();
debug!(self.log, "Re-Requesting batch";
"chain_id" => self.id,
"start_slot" => batch.start_slot,
"start_slot" => batch.start_slot(),
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"id" => *batch.id,
"peer" => format!("{:?}", batch.current_peer));
"batch_epoch" => batch.start_epoch,
"peer" => batch.current_peer.to_string());
self.send_batch(network, batch);
ProcessingResult::KeepChain
}
}
/// Returns true if this chain is currently syncing.
pub fn is_syncing(&self) -> bool {
match self.state {
ChainSyncingState::Syncing => true,
ChainSyncingState::Stopped => false,
}
}
/// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer
/// pool and left over batches until the batch buffer is reached or all peers are exhausted.
fn request_batches(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
@@ -714,10 +709,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
if let Some(peer_id) = self.get_next_peer() {
if let Some(batch) = self.get_next_batch(peer_id) {
debug!(self.log, "Requesting batch";
"chain_id" => self.id,
"start_slot" => batch.start_slot,
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"id" => *batch.id,
"start_slot" => batch.start_slot(),
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"batch_epoch" => batch.start_epoch,
"peer" => format!("{}", batch.current_peer));
// send the batch
self.send_batch(network, batch);
@@ -770,22 +764,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}
// One is added to the start slot to begin one slot after the epoch boundary
let batch_start_slot = self
.start_epoch
.start_slot(slots_per_epoch)
.saturating_add(1u64)
+ self.to_be_downloaded_id.saturating_sub(1) * blocks_per_batch;
// don't request batches beyond the target head slot
if batch_start_slot > self.target_head_slot {
if self.to_be_downloaded.start_slot(slots_per_epoch) > self.target_head_slot {
return None;
}
// truncate the batch to the epoch containing the target head of the chain
let batch_end_slot = std::cmp::min(
// request either a batch containing the max number of blocks per batch
batch_start_slot + blocks_per_batch,
self.to_be_downloaded.start_slot(slots_per_epoch) + blocks_per_batch + 1,
// or a batch of one epoch of blocks, which contains the `target_head_slot`
self.target_head_slot
.saturating_add(slots_per_epoch)
@@ -793,28 +780,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.start_slot(slots_per_epoch),
);
let batch_id = self.to_be_downloaded_id;
// Find the next batch id. The largest of the next sequential id, or the next uncompleted
// id
let max_completed_id = self
.completed_batches
.iter()
.last()
.map(|x| x.id.0)
.unwrap_or_else(|| 0);
// TODO: Check if this is necessary
self.to_be_downloaded_id = BatchId(std::cmp::max(
self.to_be_downloaded_id.0 + 1,
max_completed_id + 1,
));
Some(Batch::new(
batch_id,
batch_start_slot,
batch_end_slot,
peer_id,
))
let batch = Some(Batch::new(self.to_be_downloaded, batch_end_slot, peer_id));
self.to_be_downloaded += EPOCHS_PER_BATCH;
batch
}
/// Requests the provided batch from the provided peer.
@@ -832,14 +800,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
Err(e) => {
warn!(self.log, "Batch request failed";
"chain_id" => self.id,
"start_slot" => batch.start_slot,
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"id" => *batch.id,
"peer" => format!("{}", batch.current_peer),
"retries" => batch.retries,
"error" => e,
"re-processes" => batch.reprocess_retries);
"start_slot" => batch.start_slot(),
"end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks
"start_epoch" => batch.start_epoch,
"peer" => batch.current_peer.to_string(),
"retries" => batch.retries,
"error" => e,
"re-processes" => batch.reprocess_retries);
self.failed_batch(network, batch);
}
}

View File

@@ -9,12 +9,15 @@ use crate::sync::network_context::SyncNetworkContext;
use crate::sync::PeerSyncInfo;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::{types::SyncState, NetworkGlobals, PeerId};
use slog::{debug, error, info};
use slog::{debug, error, info, o};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::EthSpec;
use types::{Epoch, Hash256, Slot};
/// The number of head syncing chains to sync at a time.
const PARALLEL_HEAD_CHAINS: usize = 2;
/// The state of the long range/batch sync.
#[derive(Clone)]
pub enum RangeSyncState {
@@ -205,8 +208,9 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
/// Updates the state of the chain collection.
///
/// This removes any out-dated chains, swaps to any higher priority finalized chains and
/// updates the state of the collection.
pub fn update_finalized(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
/// updates the state of the collection. This starts head chains syncing if any are required to
/// do so.
pub fn update(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
let local_epoch = {
let local = match PeerSyncInfo::from_chain(&self.beacon_chain) {
Some(local) => local,
@@ -222,9 +226,25 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
local.finalized_epoch
};
// Remove any outdated finalized chains
// Remove any outdated finalized/head chains
self.purge_outdated_chains(network);
// Choose the best finalized chain if one needs to be selected.
self.update_finalized_chains(network, local_epoch);
if self.finalized_syncing_index().is_none() {
// Handle head syncing chains if there are no finalized chains left.
self.update_head_chains(network, local_epoch);
}
}
/// This looks at all current finalized chains and decides if a new chain should be prioritised
/// or not.
fn update_finalized_chains(
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
local_epoch: Epoch,
) {
// Check if any chains become the new syncing chain
if let Some(index) = self.finalized_syncing_index() {
// There is a current finalized chain syncing
@@ -269,32 +289,92 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
head_root: chain.target_head_root,
};
self.state = state;
} else {
// There are no finalized chains, update the state.
if self.head_chains.is_empty() {
self.state = RangeSyncState::Idle;
} else {
// for the syncing API, we find the minimal start_slot and the maximum
// target_slot of all head chains to report back.
let (min_epoch, max_slot) = self.head_chains.iter().fold(
(Epoch::from(0u64), Slot::from(0u64)),
|(min, max), chain| {
(
std::cmp::min(min, chain.start_epoch),
std::cmp::max(max, chain.target_head_slot),
)
},
);
let head_state = RangeSyncState::Head {
start_slot: min_epoch.start_slot(T::EthSpec::slots_per_epoch()),
head_slot: max_slot,
};
self.state = head_state;
}
}
}
/// Start syncing any head chains if required.
fn update_head_chains(
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
local_epoch: Epoch,
) {
// There are no finalized chains, update the state.
if self.head_chains.is_empty() {
self.state = RangeSyncState::Idle;
return;
}
let mut currently_syncing = self
.head_chains
.iter()
.filter(|chain| chain.is_syncing())
.count();
let mut not_syncing = self.head_chains.len() - currently_syncing;
// Find all head chains that are not currently syncing ordered by peer count.
while currently_syncing <= PARALLEL_HEAD_CHAINS && not_syncing > 0 {
// Find the chain with the most peers and start syncing
if let Some((_index, chain)) = self
.head_chains
.iter_mut()
.filter(|chain| !chain.is_syncing())
.enumerate()
.max_by_key(|(_index, chain)| chain.peer_pool.len())
{
// start syncing this chain
debug!(self.log, "New head chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_epoch"=> chain.start_epoch);
chain.start_syncing(network, local_epoch);
}
// update variables
currently_syncing = self
.head_chains
.iter()
.filter(|chain| chain.is_syncing())
.count();
not_syncing = self.head_chains.len() - currently_syncing;
}
// Start
// for the syncing API, we find the minimal start_slot and the maximum
// target_slot of all head chains to report back.
let (min_epoch, max_slot) = self
.head_chains
.iter()
.filter(|chain| chain.is_syncing())
.fold(
(Epoch::from(0u64), Slot::from(0u64)),
|(min, max), chain| {
(
std::cmp::min(min, chain.start_epoch),
std::cmp::max(max, chain.target_head_slot),
)
},
);
let head_state = RangeSyncState::Head {
start_slot: min_epoch.start_slot(T::EthSpec::slots_per_epoch()),
head_slot: max_slot,
};
self.state = head_state;
}
/// This is called once a head chain has completed syncing. It removes all non-syncing head
/// chains and re-status their peers.
pub fn clear_head_chains(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
let log_ref = &self.log;
self.head_chains.retain(|chain| {
if !chain.is_syncing()
{
debug!(log_ref, "Removing old head chain"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot);
chain.status_peers(network);
false
} else {
true
}
});
}
/// Add a new finalized chain to the collection.
pub fn new_finalized_chain(
&mut self,
@@ -313,7 +393,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
peer_id,
beacon_processor_send,
self.beacon_chain.clone(),
self.log.clone(),
self.log.new(o!("chain" => chain_id)),
));
}
@@ -321,7 +401,6 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
#[allow(clippy::too_many_arguments)]
pub fn new_head_chain(
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
remote_finalized_epoch: Epoch,
target_head: Hash256,
target_slot: Slot,
@@ -336,7 +415,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
self.head_chains.retain(|chain| !chain.peer_pool.is_empty());
let chain_id = rand::random();
let mut new_head_chain = SyncingChain::new(
let new_head_chain = SyncingChain::new(
chain_id,
remote_finalized_epoch,
target_slot,
@@ -346,8 +425,6 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
self.beacon_chain.clone(),
self.log.clone(),
);
// All head chains can sync simultaneously
new_head_chain.start_syncing(network, remote_finalized_epoch);
self.head_chains.push(new_head_chain);
}
@@ -511,7 +588,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
debug!(self.log, "Chain was removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot);
// update the state
self.update_finalized(network);
self.update(network);
}
/// Returns the index of finalized chain that is currently syncing. Returns `None` if no

View File

@@ -8,6 +8,5 @@ mod range;
mod sync_type;
pub use batch::Batch;
pub use batch::BatchId;
pub use chain::{ChainId, EPOCHS_PER_BATCH};
pub use range::RangeSync;

View File

@@ -42,7 +42,6 @@
use super::chain::{ChainId, ProcessingResult};
use super::chain_collection::{ChainCollection, RangeSyncState};
use super::sync_type::RangeSyncType;
use super::BatchId;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::BatchProcessResult;
@@ -54,7 +53,7 @@ use slog::{debug, error, trace};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{EthSpec, SignedBeaconBlock};
use types::{Epoch, EthSpec, SignedBeaconBlock};
/// The primary object dealing with long range/batch syncing. This contains all the active and
/// non-active chains that need to be processed before the syncing is considered complete. This
@@ -161,13 +160,13 @@ impl<T: BeaconChainTypes> RangeSync<T> {
.chains
.get_finalized_mut(remote_info.finalized_root, remote_finalized_slot)
{
debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => format!("{:?}", peer_id), "target_root" => format!("{}", chain.target_head_root), "end_slot" => chain.target_head_slot, "start_epoch"=> chain.start_epoch);
debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => peer_id.to_string(), "target_root" => chain.target_head_root.to_string(), "targe_slot" => chain.target_head_slot);
// add the peer to the chain's peer pool
chain.add_peer(network, peer_id);
// check if the new peer's addition will favour a new syncing chain.
self.chains.update_finalized(network);
self.chains.update(network);
// update the global sync state if necessary
self.chains.update_sync_state();
} else {
@@ -182,7 +181,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
peer_id,
self.beacon_processor_send.clone(),
);
self.chains.update_finalized(network);
self.chains.update(network);
// update the global sync state
self.chains.update_sync_state();
}
@@ -222,7 +221,6 @@ impl<T: BeaconChainTypes> RangeSync<T> {
debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote_info.head_root), "start_epoch" => start_epoch, "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id));
self.chains.new_head_chain(
network,
start_epoch,
remote_info.head_root,
remote_info.head_slot,
@@ -230,7 +228,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
self.beacon_processor_send.clone(),
);
}
self.chains.update_finalized(network);
self.chains.update(network);
self.chains.update_sync_state();
}
}
@@ -271,7 +269,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
chain_id: ChainId,
batch_id: BatchId,
epoch: Epoch,
downloaded_blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
result: BatchProcessResult,
) {
@@ -279,19 +277,13 @@ impl<T: BeaconChainTypes> RangeSync<T> {
let mut downloaded_blocks = Some(downloaded_blocks);
match self.chains.finalized_request(|chain| {
chain.on_batch_process_result(
network,
chain_id,
batch_id,
&mut downloaded_blocks,
&result,
)
chain.on_batch_process_result(network, chain_id, epoch, &mut downloaded_blocks, &result)
}) {
Some((index, ProcessingResult::RemoveChain)) => {
let chain = self.chains.remove_finalized_chain(index);
debug!(self.log, "Finalized chain removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot);
// update the state of the collection
self.chains.update_finalized(network);
self.chains.update(network);
// the chain is complete, re-status it's peers
chain.status_peers(network);
@@ -319,7 +311,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
chain.on_batch_process_result(
network,
chain_id,
batch_id,
epoch,
&mut downloaded_blocks,
&result,
)
@@ -330,8 +322,12 @@ impl<T: BeaconChainTypes> RangeSync<T> {
// the chain is complete, re-status it's peers and remove it
chain.status_peers(network);
// Remove non-syncing head chains and re-status the peers
// This removes a build-up of potentially duplicate head chains. Any
// legitimate head chains will be re-established
self.chains.clear_head_chains(network);
// update the state of the collection
self.chains.update_finalized(network);
self.chains.update(network);
// update the global state and log any change
self.chains.update_sync_state();
}
@@ -339,7 +335,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
None => {
// This can happen if a chain gets purged due to being out of date whilst a
// batch process is in progress.
debug!(self.log, "No chains match the block processing id"; "id" => *batch_id);
debug!(self.log, "No chains match the block processing id"; "batch_epoch" => epoch, "chain_id" => chain_id);
}
}
}
@@ -360,7 +356,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
self.remove_peer(network, peer_id);
// update the state of the collection
self.chains.update_finalized(network);
self.chains.update(network);
// update the global state and inform the user
self.chains.update_sync_state();
}

View File

@@ -260,6 +260,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
)
.value_name("NUM_SLOTS")
.takes_value(true)
.default_value("320")
.default_value("700")
)
}

View File

@@ -1,6 +1,6 @@
[package]
name = "boot_node"
version = "0.2.4"
version = "0.2.5"
authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2018"

View File

@@ -310,6 +310,15 @@ fn is_voting_keystore(file_name: &str) -> bool {
return true;
}
// The format exported by Prysm. I don't have a reference for this, but it was shared via
// Discord to Paul H.
if Regex::new("keystore-[0-9]+.json")
.expect("regex is valid")
.is_match(file_name)
{
return true;
}
false
}
@@ -318,8 +327,12 @@ mod tests {
use super::*;
#[test]
fn voting_keystore_filename() {
fn voting_keystore_filename_lighthouse() {
assert!(is_voting_keystore(VOTING_KEYSTORE_FILE));
}
#[test]
fn voting_keystore_filename_launchpad() {
assert!(!is_voting_keystore("cats"));
assert!(!is_voting_keystore(&format!("a{}", VOTING_KEYSTORE_FILE)));
assert!(!is_voting_keystore(&format!("{}b", VOTING_KEYSTORE_FILE)));
@@ -337,4 +350,14 @@ mod tests {
"keystore-m_12381_3600_1_0-1593476250.json"
));
}
#[test]
fn voting_keystore_filename_prysm() {
assert!(is_voting_keystore("keystore-0.json"));
assert!(is_voting_keystore("keystore-1.json"));
assert!(is_voting_keystore("keystore-101238259.json"));
assert!(!is_voting_keystore("keystore-.json"));
assert!(!is_voting_keystore("keystore-0a.json"));
assert!(!is_voting_keystore("keystore-cats.json"));
}
}

View File

@@ -10,7 +10,7 @@ use target_info::Target;
/// `Lighthouse/v0.2.0-1419501f2+`
pub const VERSION: &str = git_version!(
args = ["--always", "--dirty=+"],
prefix = "Lighthouse/v0.2.4-",
prefix = "Lighthouse/v0.2.5-",
fallback = "unknown"
);

View File

@@ -0,0 +1,8 @@
[package]
name = "lru_cache"
version = "0.1.0"
authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2018"
[dependencies]
fnv = "1.0.7"

View File

@@ -0,0 +1,7 @@
//! A library to provide fast and efficient LRU Cache's without updating.
mod space;
mod time;
pub use space::LRUCache;
pub use time::LRUTimeCache;

View File

@@ -0,0 +1,93 @@
///! This implements a time-based LRU cache for fast checking of duplicates
use fnv::FnvHashSet;
use std::collections::VecDeque;
/// Cache that stores keys until the size is used up. Does not update elements for efficiency.
pub struct LRUCache<Key>
where
Key: Eq + std::hash::Hash + Clone,
{
/// The duplicate cache.
map: FnvHashSet<Key>,
/// An ordered list of keys by order.
list: VecDeque<Key>,
// The max size of the cache,
size: usize,
}
impl<Key> LRUCache<Key>
where
Key: Eq + std::hash::Hash + Clone,
{
pub fn new(size: usize) -> Self {
LRUCache {
map: FnvHashSet::default(),
list: VecDeque::new(),
size,
}
}
/// Determines if the key is in the cache.
pub fn contains(&self, key: &Key) -> bool {
self.map.contains(key)
}
// Inserts new elements and removes any expired elements.
//
// If the key was not present this returns `true`. If the value was already present this
// returns `false`.
pub fn insert(&mut self, key: Key) -> bool {
// check the cache before removing elements
let result = self.map.insert(key.clone());
// add the new key to the list, if it doesn't already exist.
if result {
self.list.push_back(key);
}
// remove any overflow keys
self.update();
result
}
/// Removes any expired elements from the cache.
fn update(&mut self) {
// remove any expired results
for _ in 0..self.map.len().saturating_sub(self.size) {
if let Some(key) = self.list.pop_front() {
self.map.remove(&key);
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn cache_added_entries_exist() {
let mut cache = LRUCache::new(5);
cache.insert("t");
cache.insert("e");
// Should report that 't' and 't' already exists
assert!(!cache.insert("t"));
assert!(!cache.insert("e"));
}
#[test]
fn cache_entries_get_removed() {
let mut cache = LRUCache::new(2);
cache.insert("t");
assert!(!cache.insert("t"));
cache.insert("e");
assert!(!cache.insert("e"));
// add another element to clear the first key
cache.insert("s");
assert!(!cache.insert("s"));
// should be removed from the cache
assert!(cache.insert("t"));
}
}

View File

@@ -0,0 +1,126 @@
///! This implements a time-based LRU cache for fast checking of duplicates
use fnv::FnvHashSet;
use std::collections::VecDeque;
use std::time::{Duration, Instant};
struct Element<Key> {
/// The key being inserted.
key: Key,
/// The instant the key was inserted.
inserted: Instant,
}
pub struct LRUTimeCache<Key> {
/// The duplicate cache.
map: FnvHashSet<Key>,
/// An ordered list of keys by insert time.
list: VecDeque<Element<Key>>,
/// The time elements remain in the cache.
ttl: Duration,
}
impl<Key> LRUTimeCache<Key>
where
Key: Eq + std::hash::Hash + Clone,
{
pub fn new(ttl: Duration) -> Self {
LRUTimeCache {
map: FnvHashSet::default(),
list: VecDeque::new(),
ttl,
}
}
// Inserts new elements and removes any expired elements.
//
// If the key was not present this returns `true`. If the value was already present this
// returns `false`.
pub fn insert_update(&mut self, key: Key) -> bool {
// check the cache before removing elements
let result = self.map.insert(key.clone());
let now = Instant::now();
// remove any expired results
while let Some(element) = self.list.pop_front() {
if element.inserted + self.ttl > now {
self.list.push_front(element);
break;
}
self.map.remove(&element.key);
}
// add the new key to the list, if it doesn't already exist.
if result {
self.list.push_back(Element { key, inserted: now });
}
result
}
// Inserts new element does not expire old elements.
//
// If the key was not present this returns `true`. If the value was already present this
// returns `false`.
pub fn insert(&mut self, key: Key) -> bool {
// check the cache before removing elements
let result = self.map.insert(key.clone());
// add the new key to the list, if it doesn't already exist.
if result {
self.list.push_back(Element {
key,
inserted: Instant::now(),
});
}
result
}
/// Removes any expired elements from the cache.
pub fn update(&mut self) {
let now = Instant::now();
// remove any expired results
while let Some(element) = self.list.pop_front() {
if element.inserted + self.ttl > now {
self.list.push_front(element);
break;
}
self.map.remove(&element.key);
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn cache_added_entries_exist() {
let mut cache = LRUTimeCache::new(Duration::from_secs(10));
cache.insert("t");
cache.insert("e");
// Should report that 't' and 't' already exists
assert!(!cache.insert("t"));
assert!(!cache.insert("e"));
}
#[test]
fn cache_entries_expire() {
let mut cache = LRUTimeCache::new(Duration::from_millis(100));
cache.insert_update("t");
assert!(!cache.insert_update("t"));
cache.insert_update("e");
assert!(!cache.insert_update("t"));
assert!(!cache.insert_update("e"));
// sleep until cache expiry
std::thread::sleep(Duration::from_millis(101));
// add another element to clear previous cache
cache.insert_update("s");
// should be removed from the cache
assert!(cache.insert_update("t"));
}
}

View File

@@ -24,10 +24,14 @@ use serde_repr::*;
pub struct JsonKeystore {
pub crypto: Crypto,
pub uuid: Uuid,
pub path: String,
/// EIP-2335 does not declare this field as optional, but Prysm is omitting it so we must
/// support it.
pub path: Option<String>,
pub pubkey: String,
pub version: Version,
pub description: Option<String>,
/// Not part of EIP-2335, but `ethdo` and Prysm have adopted it anyway so we must support it.
pub name: Option<String>,
}
/// Version for `JsonKeystore`.

View File

@@ -172,10 +172,11 @@ impl Keystore {
},
},
uuid,
path,
path: Some(path),
pubkey: keypair.pk.to_hex_string()[2..].to_string(),
version: Version::four(),
description: None,
name: None,
},
})
}
@@ -218,8 +219,8 @@ impl Keystore {
/// Returns the path for the keystore.
///
/// Note: the path is not validated, it is simply whatever string the keystore provided.
pub fn path(&self) -> &str {
&self.json.path
pub fn path(&self) -> Option<String> {
self.json.path.clone()
}
/// Returns the pubkey for the keystore.

View File

@@ -64,7 +64,7 @@ fn eip2335_test_vector_scrypt() {
Uuid::parse_str("1d85ae20-35c5-4611-98e8-aa14a633906f").unwrap(),
"uuid"
);
assert_eq!(keystore.path(), "", "path");
assert_eq!(keystore.path().unwrap(), "", "path");
}
#[test]
@@ -108,5 +108,5 @@ fn eip2335_test_vector_pbkdf() {
Uuid::parse_str("64625def-3331-4eea-ab6f-782f3ed16a83").unwrap(),
"uuid"
);
assert_eq!(keystore.path(), "m/12381/60/0/0", "path");
assert_eq!(keystore.path().unwrap(), "m/12381/60/0/0", "path");
}

View File

@@ -841,10 +841,7 @@ fn missing_path() {
}
"#;
match Keystore::from_json_str(&vector) {
Err(Error::InvalidJson(_)) => {}
_ => panic!("expected invalid json error"),
}
assert!(Keystore::from_json_str(&vector).is_ok());
}
#[test]
@@ -1010,3 +1007,43 @@ fn pbkdf2_missing_parameter() {
_ => panic!("expected invalid json error"),
}
}
#[test]
fn name_field() {
let vector = r#"
{
"crypto": {
"kdf": {
"function": "scrypt",
"params": {
"dklen": 32,
"n": 262144,
"p": 1,
"r": 8,
"salt": "d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3"
},
"message": ""
},
"checksum": {
"function": "sha256",
"params": {},
"message": "149aafa27b041f3523c53d7acba1905fa6b1c90f9fef137568101f44b531a3cb"
},
"cipher": {
"function": "aes-128-ctr",
"params": {
"iv": "264daa3f303d7259501c93d997d84fe6"
},
"message": "54ecc8863c0550351eee5720f3be6a5d4a016025aa91cd6436cfec938d6a8d30"
}
},
"pubkey": "9612d7a727c9d0a22e185a1c768478dfe919cada9266988cb32359c11f2b7b27f4ae4040902382ae2910c15e2b420d07",
"uuid": "1d85ae20-35c5-4611-98e8-aa14a633906f",
"path": "",
"version": 4,
"name": "cats"
}
"#;
assert!(Keystore::from_json_str(&vector).is_ok());
}

View File

@@ -225,13 +225,13 @@ fn key_derivation_from_seed() {
.expect("should generate keystores");
assert_eq!(
keystores.voting.path(),
keystores.voting.path().unwrap(),
format!("m/12381/3600/{}/0/0", i),
"voting path should match"
);
assert_eq!(
keystores.withdrawal.path(),
keystores.withdrawal.path().unwrap(),
format!("m/12381/3600/{}/0", i),
"withdrawal path should match"
);

View File

@@ -1,7 +1,7 @@
[package]
name = "lcli"
description = "Lighthouse CLI (modeled after zcli)"
version = "0.2.2"
version = "0.2.5"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"

View File

@@ -1,6 +1,6 @@
[package]
name = "lighthouse"
version = "0.2.4"
version = "0.2.5"
authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2018"

View File

@@ -1,6 +1,6 @@
[package]
name = "validator_client"
version = "0.2.4"
version = "0.2.5"
authors = ["Paul Hauner <paul@paulhauner.com>", "Age Manning <Age@AgeManning.com>", "Luke Anderson <luke@lukeanderson.com.au>"]
edition = "2018"