Server sent events (#1920)

## Issue Addressed

Resolves #1434 (this is the last major feature in the standard spec. There are only a couple of places we may be off-spec due to recent spec changes or ongoing discussion)
Partly addresses #1669
 
## Proposed Changes

- remove the websocket server
- remove the `TeeEventHandler` and `NullEventHandler` 
- add server sent events according to the eth2 API spec

## Additional Info

This is according to the currently unmerged PR here: https://github.com/ethereum/eth2.0-APIs/pull/117


Co-authored-by: realbigsean <seananderson33@gmail.com>
This commit is contained in:
realbigsean
2020-12-04 00:18:58 +00:00
parent 2b5c0df9e5
commit fdfb81a74a
28 changed files with 969 additions and 766 deletions

View File

@@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
warp = { git = "https://github.com/sigp/warp ", branch = "lighthouse" }
serde = { version = "1.0.116", features = ["derive"] }
tokio = { version = "0.3.2", features = ["macros"] }
tokio = { version = "0.3.2", features = ["macros","stream","sync"] }
parking_lot = "0.11.0"
types = { path = "../../consensus/types" }
hex = "0.4.2"
@@ -26,6 +26,7 @@ warp_utils = { path = "../../common/warp_utils" }
slot_clock = { path = "../../common/slot_clock" }
eth2_ssz = { path = "../../consensus/ssz" }
bs58 = "0.3.1"
futures = "0.3.8"
[dev-dependencies]
store = { path = "../store" }

View File

@@ -17,7 +17,7 @@ use beacon_chain::{
};
use beacon_proposer_cache::BeaconProposerCache;
use block_id::BlockId;
use eth2::types::{self as api_types, ValidatorId};
use eth2::types::{self as api_types, EventKind, ValidatorId};
use eth2_libp2p::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
use lighthouse_version::version_with_platform;
use network::NetworkMessage;
@@ -33,6 +33,8 @@ use std::convert::TryInto;
use std::future::Future;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tokio::stream::{StreamExt, StreamMap};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc::UnboundedSender;
use types::{
Attestation, AttestationDuty, AttesterSlashing, CloneConfig, CommitteeCache, Epoch, EthSpec,
@@ -40,7 +42,9 @@ use types::{
SignedBeaconBlock, SignedVoluntaryExit, Slot, YamlConfig,
};
use warp::http::StatusCode;
use warp::{http::Response, Filter};
use warp::sse::ServerSentEvent;
use warp::{http::Response, Filter, Stream};
use warp_utils::reject::ServerSentEventError;
use warp_utils::task::{blocking_json_task, blocking_task};
const API_PREFIX: &str = "eth";
@@ -1571,15 +1575,37 @@ pub fn serve<T: BeaconChainTypes>(
}
if epoch == current_epoch {
let dependent_root_slot = current_epoch
.start_slot(T::EthSpec::slots_per_epoch()) - 1;
let dependent_root = if dependent_root_slot > chain.best_slot().map_err(warp_utils::reject::beacon_chain_error)? {
chain.head_beacon_block_root().map_err(warp_utils::reject::beacon_chain_error)?
} else {
chain
.root_at_slot(dependent_root_slot)
.map_err(warp_utils::reject::beacon_chain_error)?
.unwrap_or(chain.genesis_block_root)
};
beacon_proposer_cache
.lock()
.get_proposers(&chain, epoch)
.map(api_types::GenericResponse::from)
.map(|duties| api_types::DutiesResponse{ data: duties, dependent_root} )
} else {
let state =
StateId::slot(epoch.start_slot(T::EthSpec::slots_per_epoch()))
.state(&chain)?;
let dependent_root_slot = state.current_epoch()
.start_slot(T::EthSpec::slots_per_epoch()) - 1;
let dependent_root = if dependent_root_slot > chain.best_slot().map_err(warp_utils::reject::beacon_chain_error)? {
chain.head_beacon_block_root().map_err(warp_utils::reject::beacon_chain_error)?
} else {
chain
.root_at_slot(dependent_root_slot)
.map_err(warp_utils::reject::beacon_chain_error)?
.unwrap_or(chain.genesis_block_root)
};
epoch
.slot_iter(T::EthSpec::slots_per_epoch())
.map(|slot| {
@@ -1604,7 +1630,13 @@ pub fn serve<T: BeaconChainTypes>(
})
})
.collect::<Result<Vec<api_types::ProposerData>, _>>()
.map(api_types::GenericResponse::from)
.map(|duties| {
api_types::DutiesResponse{
dependent_root,
data: duties,
}
})
}
})
},
@@ -1781,9 +1813,9 @@ pub fn serve<T: BeaconChainTypes>(
//
// The idea is to stop historical requests from washing out the cache on the
// beacon chain, whilst allowing a VC to request duties quickly.
let duties = if epoch == current_epoch {
let (duties, dependent_root) = if epoch == current_epoch {
// Fast path.
pubkeys
let duties = pubkeys
.into_iter()
// Exclude indices which do not represent a known public key and a
// validator duty.
@@ -1796,7 +1828,26 @@ pub fn serve<T: BeaconChainTypes>(
.map(|duty| convert(i, pubkey, duty)),
)
})
.collect::<Result<Vec<_>, warp::Rejection>>()?
.collect::<Result<Vec<_>, warp::Rejection>>()?;
let dependent_root_slot =
(epoch - 1).start_slot(T::EthSpec::slots_per_epoch()) - 1;
let dependent_root = if dependent_root_slot
> chain
.best_slot()
.map_err(warp_utils::reject::beacon_chain_error)?
{
chain
.head_beacon_block_root()
.map_err(warp_utils::reject::beacon_chain_error)?
} else {
chain
.root_at_slot(dependent_root_slot)
.map_err(warp_utils::reject::beacon_chain_error)?
.unwrap_or(chain.genesis_block_root)
};
(duties, dependent_root)
} else {
// If the head state is equal to or earlier than the request epoch, use it.
let mut state = chain
@@ -1843,7 +1894,7 @@ pub fn serve<T: BeaconChainTypes>(
state
.build_committee_cache(relative_epoch, &chain.spec)
.map_err(warp_utils::reject::beacon_state_error)?;
pubkeys
let duties = pubkeys
.into_iter()
.filter_map(|(i, pubkey)| {
Some(
@@ -1854,10 +1905,32 @@ pub fn serve<T: BeaconChainTypes>(
.map(|duty| convert(i, pubkey, duty)),
)
})
.collect::<Result<Vec<_>, warp::Rejection>>()?
.collect::<Result<Vec<_>, warp::Rejection>>()?;
let dependent_root_slot =
(epoch - 1).start_slot(T::EthSpec::slots_per_epoch()) - 1;
let dependent_root = if dependent_root_slot
> chain
.best_slot()
.map_err(warp_utils::reject::beacon_chain_error)?
{
chain
.head_beacon_block_root()
.map_err(warp_utils::reject::beacon_chain_error)?
} else {
chain
.root_at_slot(dependent_root_slot)
.map_err(warp_utils::reject::beacon_chain_error)?
.unwrap_or(chain.genesis_block_root)
};
(duties, dependent_root)
};
Ok(api_types::GenericResponse::from(duties))
Ok(api_types::DutiesResponse {
dependent_root,
data: duties,
})
})
},
);
@@ -2190,7 +2263,7 @@ pub fn serve<T: BeaconChainTypes>(
let get_lighthouse_staking = warp::path("lighthouse")
.and(warp::path("staking"))
.and(warp::path::end())
.and(chain_filter)
.and(chain_filter.clone())
.and_then(|chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
if chain.eth1_chain.is_some() {
@@ -2205,6 +2278,67 @@ pub fn serve<T: BeaconChainTypes>(
})
});
fn merge_streams<T: EthSpec>(
stream_map: StreamMap<
String,
impl Stream<Item = Result<EventKind<T>, RecvError>> + Unpin + Send + 'static,
>,
) -> impl Stream<Item = Result<impl ServerSentEvent + Send + 'static, ServerSentEventError>>
+ Send
+ 'static {
// Convert messages into Server-Sent Events and return resulting stream.
stream_map.map(move |(topic_name, msg)| match msg {
Ok(data) => Ok((warp::sse::event(topic_name), warp::sse::json(data)).boxed()),
Err(e) => Err(warp_utils::reject::server_sent_event_error(format!(
"{:?}",
e
))),
})
}
let get_events = eth1_v1
.and(warp::path("events"))
.and(warp::path::end())
.and(warp::query::<api_types::EventQuery>())
.and(chain_filter)
.and_then(
|topics: api_types::EventQuery, chain: Arc<BeaconChain<T>>| {
blocking_task(move || {
// for each topic subscribed spawn a new subscription
let mut stream_map = StreamMap::with_capacity(topics.topics.0.len());
if let Some(event_handler) = chain.event_handler.as_ref() {
for topic in topics.topics.0.clone() {
let receiver = match topic {
api_types::EventTopic::Head => event_handler.subscribe_head(),
api_types::EventTopic::Block => event_handler.subscribe_block(),
api_types::EventTopic::Attestation => {
event_handler.subscribe_attestation()
}
api_types::EventTopic::VoluntaryExit => {
event_handler.subscribe_exit()
}
api_types::EventTopic::FinalizedCheckpoint => {
event_handler.subscribe_finalized()
}
};
stream_map.insert(topic.to_string(), Box::pin(receiver.into_stream()));
}
} else {
return Err(warp_utils::reject::custom_server_error(
"event handler was not initialized".to_string(),
));
}
let stream = merge_streams(stream_map);
Ok::<_, warp::Rejection>(warp::sse::reply(
warp::sse::keep_alive().stream(stream),
))
})
},
);
// Define the ultimate set of routes that will be provided to the server.
let routes = warp::get()
.and(
@@ -2253,7 +2387,8 @@ pub fn serve<T: BeaconChainTypes>(
.or(get_lighthouse_eth1_block_cache.boxed())
.or(get_lighthouse_eth1_deposit_cache.boxed())
.or(get_lighthouse_beacon_states_ssz.boxed())
.or(get_lighthouse_staking.boxed()),
.or(get_lighthouse_staking.boxed())
.or(get_events.boxed()),
)
.or(warp::post().and(
post_beacon_blocks

View File

@@ -14,14 +14,17 @@ use eth2_libp2p::{
types::{EnrBitfield, SyncState},
Enr, EnrExt, NetworkGlobals, PeerId,
};
use futures::stream::{Stream, StreamExt};
use http_api::{Config, Context};
use network::NetworkMessage;
use state_processing::per_slot_processing;
use std::convert::TryInto;
use std::iter::Iterator;
use std::net::Ipv4Addr;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::Duration;
use tokio_compat_02::FutureExt;
use tree_hash::TreeHash;
use types::{
@@ -33,7 +36,7 @@ type E = MainnetEthSpec;
const SLOTS_PER_EPOCH: u64 = 32;
const VALIDATOR_COUNT: usize = SLOTS_PER_EPOCH as usize;
const CHAIN_LENGTH: u64 = SLOTS_PER_EPOCH * 5;
const CHAIN_LENGTH: u64 = SLOTS_PER_EPOCH * 5 - 1; // Make `next_block` an epoch transition
const JUSTIFIED_EPOCH: u64 = 4;
const FINALIZED_EPOCH: u64 = 3;
const TCP_PORT: u16 = 42;
@@ -131,7 +134,7 @@ impl ApiTester {
assert_eq!(
chain.head_info().unwrap().finalized_checkpoint.epoch,
3,
2,
"precondition: finality"
);
assert_eq!(
@@ -140,7 +143,7 @@ impl ApiTester {
.unwrap()
.current_justified_checkpoint
.epoch,
4,
3,
"precondition: justification"
);
@@ -218,6 +221,111 @@ impl ApiTester {
}
}
pub fn new_from_genesis() -> Self {
let harness = BeaconChainHarness::new(
MainnetEthSpec,
generate_deterministic_keypairs(VALIDATOR_COUNT),
);
harness.advance_slot();
let head = harness.chain.head().unwrap();
let (next_block, _next_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());
let attestations = harness
.get_unaggregated_attestations(
&AttestationStrategy::AllValidators,
&head.beacon_state,
head.beacon_block_root,
harness.chain.slot().unwrap(),
)
.into_iter()
.map(|vec| vec.into_iter().map(|(attestation, _subnet_id)| attestation))
.flatten()
.collect::<Vec<_>>();
let attester_slashing = harness.make_attester_slashing(vec![0, 1]);
let proposer_slashing = harness.make_proposer_slashing(2);
let voluntary_exit = harness.make_voluntary_exit(3, harness.chain.epoch().unwrap());
let chain = Arc::new(harness.chain);
let (network_tx, network_rx) = mpsc::unbounded_channel();
let log = null_logger().unwrap();
// Default metadata
let meta_data = MetaData {
seq_number: SEQ_NUMBER,
attnets: EnrBitfield::<MainnetEthSpec>::default(),
};
let enr_key = CombinedKey::generate_secp256k1();
let enr = EnrBuilder::new("v4").build(&enr_key).unwrap();
let enr_clone = enr.clone();
let network_globals = NetworkGlobals::new(enr, TCP_PORT, UDP_PORT, meta_data, vec![], &log);
let peer_id = PeerId::random();
network_globals.peers.write().connect_ingoing(
&peer_id,
EXTERNAL_ADDR.parse().unwrap(),
None,
);
*network_globals.sync_state.write() = SyncState::Synced;
let eth1_service =
eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone());
let context = Arc::new(Context {
config: Config {
enabled: true,
listen_addr: Ipv4Addr::new(127, 0, 0, 1),
listen_port: 0,
allow_origin: None,
},
chain: Some(chain.clone()),
network_tx: Some(network_tx),
network_globals: Some(Arc::new(network_globals)),
eth1_service: Some(eth1_service),
log,
});
let ctx = context.clone();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let server_shutdown = async {
// It's not really interesting why this triggered, just that it happened.
let _ = shutdown_rx.await;
};
let (listening_socket, server) = http_api::serve(ctx, server_shutdown).unwrap();
tokio::spawn(async { server.await });
let client = BeaconNodeHttpClient::new(
Url::parse(&format!(
"http://{}:{}",
listening_socket.ip(),
listening_socket.port()
))
.unwrap(),
);
Self {
chain,
client,
next_block,
attestations,
attester_slashing,
proposer_slashing,
voluntary_exit,
_server_shutdown: shutdown_tx,
validator_keypairs: harness.validator_keypairs,
network_rx,
local_enr: enr_clone,
external_peer_id: peer_id,
}
}
fn skip_slots(self, count: u64) -> Self {
for _ in 0..count {
self.chain
@@ -1376,8 +1484,17 @@ impl ApiTester {
.client
.post_validator_duties_attester(epoch, indices.as_slice())
.await
.unwrap();
let dependent_root = self
.chain
.root_at_slot((epoch - 1).start_slot(E::slots_per_epoch()) - 1)
.unwrap()
.data;
.unwrap_or(self.chain.head_beacon_block_root().unwrap());
assert_eq!(results.dependent_root, dependent_root);
let result_duties = results.data;
let mut state = self
.chain
@@ -1395,7 +1512,7 @@ impl ApiTester {
.filter(|i| **i < state.validators.len() as u64)
.count();
assert_eq!(results.len(), expected_len);
assert_eq!(result_duties.len(), expected_len);
for (indices_set, &i) in indices.iter().enumerate() {
if let Some(duty) = state
@@ -1412,7 +1529,7 @@ impl ApiTester {
slot: duty.slot,
};
let result = results
let result = result_duties
.iter()
.find(|duty| duty.validator_index == i)
.unwrap();
@@ -1424,7 +1541,7 @@ impl ApiTester {
);
} else {
assert!(
!results.iter().any(|duty| duty.validator_index == i),
!result_duties.iter().any(|duty| duty.validator_index == i),
"validator index should not exist in response"
);
}
@@ -1438,12 +1555,17 @@ impl ApiTester {
pub async fn test_get_validator_duties_proposer(self) -> Self {
let current_epoch = self.chain.epoch().unwrap();
let dependent_root = self
.chain
.root_at_slot(current_epoch.start_slot(E::slots_per_epoch()) - 1)
.unwrap()
.unwrap_or(self.chain.head_beacon_block_root().unwrap());
let result = self
.client
.get_validator_duties_proposer(current_epoch)
.await
.unwrap()
.data;
.unwrap();
let mut state = self.chain.head_beacon_state().unwrap();
@@ -1455,7 +1577,7 @@ impl ApiTester {
.build_committee_cache(RelativeEpoch::Current, &self.chain.spec)
.unwrap();
let expected = current_epoch
let expected_duties = current_epoch
.slot_iter(E::slots_per_epoch())
.map(|slot| {
let index = state
@@ -1471,6 +1593,11 @@ impl ApiTester {
})
.collect::<Vec<_>>();
let expected = DutiesResponse {
data: expected_duties,
dependent_root,
};
assert_eq!(result, expected);
self
@@ -1824,6 +1951,185 @@ impl ApiTester {
self
}
pub async fn test_get_events(self) -> Self {
// Subscribe to all events
let topics = vec![
EventTopic::Attestation,
EventTopic::VoluntaryExit,
EventTopic::Block,
EventTopic::Head,
EventTopic::FinalizedCheckpoint,
];
let mut events_future = self
.client
.get_events::<E>(topics.as_slice())
.await
.unwrap();
let expected_attestation_len = self.attestations.len();
self.client
.post_beacon_pool_attestations(self.attestations.as_slice())
.await
.unwrap();
let attestation_events = poll_events(
&mut events_future,
expected_attestation_len,
Duration::from_millis(10000),
)
.await;
assert_eq!(
attestation_events.as_slice(),
self.attestations
.clone()
.into_iter()
.map(|attestation| EventKind::Attestation(attestation))
.collect::<Vec<_>>()
.as_slice()
);
// Produce a voluntary exit event
self.client
.post_beacon_pool_voluntary_exits(&self.voluntary_exit)
.await
.unwrap();
let exit_events = poll_events(&mut events_future, 1, Duration::from_millis(10000)).await;
assert_eq!(
exit_events.as_slice(),
&[EventKind::VoluntaryExit(self.voluntary_exit.clone())]
);
// Submit the next block, which is on an epoch boundary, so this will produce a finalized
// checkpoint event, head event, and block event
let block_root = self.next_block.canonical_root();
// current_duty_dependent_root = block root because this is the first slot of the epoch
let current_duty_dependent_root = self.chain.head_beacon_block_root().unwrap();
let current_slot = self.chain.slot().unwrap();
let next_slot = self.next_block.slot();
let finalization_distance = E::slots_per_epoch() * 2;
let expected_block = EventKind::Block(SseBlock {
block: block_root,
slot: next_slot,
});
let expected_head = EventKind::Head(SseHead {
block: block_root,
slot: next_slot,
state: self.next_block.state_root(),
current_duty_dependent_root,
previous_duty_dependent_root: self
.chain
.root_at_slot(current_slot - E::slots_per_epoch())
.unwrap()
.unwrap(),
epoch_transition: true,
});
let expected_finalized = EventKind::FinalizedCheckpoint(SseFinalizedCheckpoint {
block: self
.chain
.root_at_slot(next_slot - finalization_distance)
.unwrap()
.unwrap(),
state: self
.chain
.state_root_at_slot(next_slot - finalization_distance)
.unwrap()
.unwrap(),
epoch: Epoch::new(3),
});
self.client
.post_beacon_blocks(&self.next_block)
.await
.unwrap();
let block_events = poll_events(&mut events_future, 3, Duration::from_millis(10000)).await;
assert_eq!(
block_events.as_slice(),
&[expected_block, expected_finalized, expected_head]
);
self
}
pub async fn test_get_events_from_genesis(self) -> Self {
let topics = vec![EventTopic::Block, EventTopic::Head];
let mut events_future = self
.client
.get_events::<E>(topics.as_slice())
.await
.unwrap();
let block_root = self.next_block.canonical_root();
let next_slot = self.next_block.slot();
let expected_block = EventKind::Block(SseBlock {
block: block_root,
slot: next_slot,
});
let expected_head = EventKind::Head(SseHead {
block: block_root,
slot: next_slot,
state: self.next_block.state_root(),
current_duty_dependent_root: self.chain.genesis_block_root,
previous_duty_dependent_root: self.chain.genesis_block_root,
epoch_transition: false,
});
self.client
.post_beacon_blocks(&self.next_block)
.await
.unwrap();
let block_events = poll_events(&mut events_future, 2, Duration::from_millis(10000)).await;
assert_eq!(block_events.as_slice(), &[expected_block, expected_head]);
self
}
}
async fn poll_events<S: Stream<Item = Result<EventKind<T>, eth2::Error>> + Unpin, T: EthSpec>(
stream: &mut S,
num_events: usize,
timeout: Duration,
) -> Vec<EventKind<T>> {
let mut events = Vec::new();
let collect_stream_fut = async {
loop {
if let Some(result) = stream.next().await {
events.push(result.unwrap());
if events.len() == num_events {
return;
}
}
}
};
tokio::select! {
_ = collect_stream_fut => {return events}
_ = tokio::time::sleep(timeout) => { return events; }
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_events() {
ApiTester::new().test_get_events().compat().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_events_from_genesis() {
ApiTester::new_from_genesis()
.test_get_events_from_genesis()
.compat()
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]