Upgrade discovery and restructure task execution (#1693)

* Initial rebase

* Remove old code

* Correct release tests

* Rebase commit

* Remove eth2-testnet dep on eth2libp2p

* Remove crates lost in rebase

* Remove unused dep
This commit is contained in:
Age Manning
2020-10-05 18:45:54 +11:00
committed by GitHub
parent bcb629564a
commit 240181e840
35 changed files with 273 additions and 221 deletions

View File

@@ -123,6 +123,7 @@ impl Default for Config {
.request_retries(1)
.enr_peer_update_min(10)
.query_parallelism(5)
.disable_report_discovered_peers()
.query_timeout(Duration::from_secs(30))
.query_peer_timeout(Duration::from_secs(2))
.ip_limit() // limits /24 IP's in buckets.

View File

@@ -144,12 +144,12 @@ pub fn build_enr<T: EthSpec>(
let mut builder = create_enr_builder_from_config(config);
// set the `eth2` field on our ENR
builder.add_value(ETH2_ENR_KEY.into(), enr_fork_id.as_ssz_bytes());
builder.add_value(ETH2_ENR_KEY, &enr_fork_id.as_ssz_bytes());
// set the "attnets" field on our ENR
let bitfield = BitVector::<T::SubnetBitfieldLength>::new();
builder.add_value(BITFIELD_ENR_KEY.into(), bitfield.as_ssz_bytes());
builder.add_value(BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes());
builder
.build(enr_key)

View File

@@ -365,7 +365,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
/// If the external address needs to be modified, use `update_enr_udp_socket.
pub fn update_enr_tcp_port(&mut self, port: u16) -> Result<(), String> {
self.discv5
.enr_insert("tcp", port.to_be_bytes().into())
.enr_insert("tcp", &port.to_be_bytes())
.map_err(|e| format!("{:?}", e))?;
// replace the global version
@@ -383,18 +383,18 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
match socket_addr {
SocketAddr::V4(socket) => {
self.discv5
.enr_insert("ip", socket.ip().octets().into())
.enr_insert("ip", &socket.ip().octets())
.map_err(|e| format!("{:?}", e))?;
self.discv5
.enr_insert("udp", socket.port().to_be_bytes().into())
.enr_insert("udp", &socket.port().to_be_bytes())
.map_err(|e| format!("{:?}", e))?;
}
SocketAddr::V6(socket) => {
self.discv5
.enr_insert("ip6", socket.ip().octets().into())
.enr_insert("ip6", &socket.ip().octets())
.map_err(|e| format!("{:?}", e))?;
self.discv5
.enr_insert("udp6", socket.port().to_be_bytes().into())
.enr_insert("udp6", &socket.port().to_be_bytes())
.map_err(|e| format!("{:?}", e))?;
}
}
@@ -439,7 +439,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// insert the bitfield into the ENR record
self.discv5
.enr_insert(BITFIELD_ENR_KEY, current_bitfield.as_ssz_bytes())
.enr_insert(BITFIELD_ENR_KEY, &current_bitfield.as_ssz_bytes())
.map_err(|e| format!("{:?}", e))?;
// replace the global version
@@ -468,7 +468,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
let _ = self
.discv5
.enr_insert(ETH2_ENR_KEY, enr_fork_id.as_ssz_bytes())
.enr_insert(ETH2_ENR_KEY, &enr_fork_id.as_ssz_bytes())
.map_err(|e| {
warn!(
self.log,
@@ -858,7 +858,10 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// Still awaiting the event stream, poll it
if let Poll::Ready(event_stream) = fut.poll_unpin(cx) {
match event_stream {
Ok(stream) => self.event_stream = EventStream::Present(stream),
Ok(stream) => {
debug!(self.log, "Discv5 event stream ready");
self.event_stream = EventStream::Present(stream);
}
Err(e) => {
slog::crit!(self.log, "Discv5 event stream failed"; "error" => e.to_string());
self.event_stream = EventStream::InActive;

View File

@@ -59,7 +59,7 @@ pub struct Service<TSpec: EthSpec> {
impl<TSpec: EthSpec> Service<TSpec> {
pub async fn new(
executor: environment::TaskExecutor,
executor: task_executor::TaskExecutor,
config: &NetworkConfig,
enr_fork_id: EnrForkId,
log: &slog::Logger,
@@ -109,7 +109,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
Behaviour::new(&local_keypair, config, network_globals.clone(), &log).await?;
// use the executor for libp2p
struct Executor(environment::TaskExecutor);
struct Executor(task_executor::TaskExecutor);
impl libp2p::core::Executor for Executor {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
self.0.spawn(f, "libp2p");