mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-11 04:31:51 +00:00
Update to tokio 1.1 (#2172)
## Issue Addressed resolves #2129 resolves #2099 addresses some of #1712 unblocks #2076 unblocks #2153 ## Proposed Changes - Updates all the dependencies mentioned in #2129, except for web3. They haven't merged their tokio 1.0 update because they are waiting on some dependencies of their own. Since we only use web3 in tests, I think updating it in a separate issue is fine. If they are able to merge soon though, I can update in this PR. - Updates `tokio_util` to 0.6.2 and `bytes` to 1.0.1. - We haven't made a discv5 release since merging tokio 1.0 updates so I'm using a commit rather than release atm. **Edit:** I think we should merge an update of `tokio_util` to 0.6.2 into discv5 before this release because it has panic fixes in `DelayQueue` --> PR in discv5: https://github.com/sigp/discv5/pull/58 ## Additional Info tokio 1.0 changes that required some changes in lighthouse: - `interval.next().await.is_some()` -> `interval.tick().await` - `sleep` future is now `!Unpin` -> https://github.com/tokio-rs/tokio/issues/3028 - `try_recv` has been temporarily removed from `mpsc` -> https://github.com/tokio-rs/tokio/issues/3350 - stream features have moved to `tokio-stream` and `broadcast::Receiver::into_stream()` has been temporarily removed -> `https://github.com/tokio-rs/tokio/issues/2870 - I've copied over the `BroadcastStream` wrapper from this PR, but can update to use `tokio-stream` once it's merged https://github.com/tokio-rs/tokio/pull/3384 Co-authored-by: realbigsean <seananderson33@gmail.com>
This commit is contained in:
66
beacon_node/http_api/src/broadcast_stream.rs
Normal file
66
beacon_node/http_api/src/broadcast_stream.rs
Normal file
@@ -0,0 +1,66 @@
|
||||
// TODO: this should be replaced with the tokio's `BroadcastStream` once it's added to
|
||||
// tokio-stream (https://github.com/tokio-rs/tokio/pull/3384)
|
||||
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
use tokio_stream::Stream;
|
||||
use tokio_util::sync::ReusableBoxFuture;
|
||||
|
||||
/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`].
|
||||
///
|
||||
/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver
|
||||
/// [`Stream`]: trait@crate::Stream
|
||||
pub struct BroadcastStream<T> {
|
||||
inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>,
|
||||
}
|
||||
|
||||
/// An error returned from the inner stream of a [`BroadcastStream`].
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum BroadcastStreamRecvError {
|
||||
/// The receiver lagged too far behind. Attempting to receive again will
|
||||
/// return the oldest message still retained by the channel.
|
||||
///
|
||||
/// Includes the number of skipped messages.
|
||||
Lagged(u64),
|
||||
}
|
||||
|
||||
async fn make_future<T: Clone>(mut rx: Receiver<T>) -> (Result<T, RecvError>, Receiver<T>) {
|
||||
let result = rx.recv().await;
|
||||
(result, rx)
|
||||
}
|
||||
|
||||
impl<T: 'static + Clone + Send> BroadcastStream<T> {
|
||||
/// Create a new `BroadcastStream`.
|
||||
pub fn new(rx: Receiver<T>) -> Self {
|
||||
Self {
|
||||
inner: ReusableBoxFuture::new(make_future(rx)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
|
||||
type Item = Result<T, BroadcastStreamRecvError>;
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let (result, rx) = match self.inner.poll(cx) {
|
||||
std::task::Poll::Ready(t) => t,
|
||||
std::task::Poll::Pending => return std::task::Poll::Pending,
|
||||
};
|
||||
self.inner.set(make_future(rx));
|
||||
match result {
|
||||
Ok(item) => Poll::Ready(Some(Ok(item))),
|
||||
Err(RecvError::Closed) => Poll::Ready(None),
|
||||
Err(RecvError::Lagged(n)) => {
|
||||
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n))))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for BroadcastStream<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("BroadcastStream").finish()
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@
|
||||
|
||||
mod beacon_proposer_cache;
|
||||
mod block_id;
|
||||
mod broadcast_stream;
|
||||
mod metrics;
|
||||
mod state_id;
|
||||
mod validator_inclusion;
|
||||
@@ -18,7 +19,7 @@ use beacon_chain::{
|
||||
};
|
||||
use beacon_proposer_cache::BeaconProposerCache;
|
||||
use block_id::BlockId;
|
||||
use eth2::types::{self as api_types, EventKind, ValidatorId};
|
||||
use eth2::types::{self as api_types, ValidatorId};
|
||||
use eth2_libp2p::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
|
||||
use lighthouse_version::version_with_platform;
|
||||
use network::NetworkMessage;
|
||||
@@ -34,19 +35,17 @@ use std::convert::TryInto;
|
||||
use std::future::Future;
|
||||
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
|
||||
use std::sync::Arc;
|
||||
use tokio::stream::{StreamExt, StreamMap};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio_stream::StreamExt;
|
||||
use types::{
|
||||
Attestation, AttestationDuty, AttesterSlashing, CloneConfig, CommitteeCache, Epoch, EthSpec,
|
||||
Hash256, ProposerSlashing, PublicKey, PublicKeyBytes, RelativeEpoch, SignedAggregateAndProof,
|
||||
SignedBeaconBlock, SignedVoluntaryExit, Slot, YamlConfig,
|
||||
};
|
||||
use warp::http::StatusCode;
|
||||
use warp::sse::ServerSentEvent;
|
||||
use warp::sse::Event;
|
||||
use warp::Reply;
|
||||
use warp::{http::Response, Filter, Stream};
|
||||
use warp_utils::reject::ServerSentEventError;
|
||||
use warp::{http::Response, Filter};
|
||||
use warp_utils::task::{blocking_json_task, blocking_task};
|
||||
|
||||
const API_PREFIX: &str = "eth";
|
||||
@@ -1610,9 +1609,9 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.and(warp::path("duties"))
|
||||
.and(warp::path("proposer"))
|
||||
.and(warp::path::param::<Epoch>().or_else(|_| async {
|
||||
Err(warp_utils::reject::custom_bad_request(
|
||||
"Invalid epoch".to_string(),
|
||||
))
|
||||
Err(warp_utils::reject::custom_bad_request(
|
||||
"Invalid epoch".to_string(),
|
||||
))
|
||||
}))
|
||||
.and(warp::path::end())
|
||||
.and(not_while_syncing_filter.clone())
|
||||
@@ -1637,7 +1636,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
if epoch == current_epoch {
|
||||
let dependent_root_slot = current_epoch
|
||||
.start_slot(T::EthSpec::slots_per_epoch()) - 1;
|
||||
let dependent_root = if dependent_root_slot > chain.best_slot().map_err(warp_utils::reject::beacon_chain_error)? {
|
||||
let dependent_root = if dependent_root_slot > chain.best_slot().map_err(warp_utils::reject::beacon_chain_error)? {
|
||||
chain.head_beacon_block_root().map_err(warp_utils::reject::beacon_chain_error)?
|
||||
} else {
|
||||
chain
|
||||
@@ -1649,7 +1648,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
beacon_proposer_cache
|
||||
.lock()
|
||||
.get_proposers(&chain, epoch)
|
||||
.map(|duties| api_types::DutiesResponse{ data: duties, dependent_root} )
|
||||
.map(|duties| api_types::DutiesResponse { data: duties, dependent_root })
|
||||
} else {
|
||||
let state =
|
||||
StateId::slot(epoch.start_slot(T::EthSpec::slots_per_epoch()))
|
||||
@@ -1657,7 +1656,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
|
||||
let dependent_root_slot = state.current_epoch()
|
||||
.start_slot(T::EthSpec::slots_per_epoch()) - 1;
|
||||
let dependent_root = if dependent_root_slot > chain.best_slot().map_err(warp_utils::reject::beacon_chain_error)? {
|
||||
let dependent_root = if dependent_root_slot > chain.best_slot().map_err(warp_utils::reject::beacon_chain_error)? {
|
||||
chain.head_beacon_block_root().map_err(warp_utils::reject::beacon_chain_error)?
|
||||
} else {
|
||||
chain
|
||||
@@ -1691,8 +1690,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
})
|
||||
.collect::<Result<Vec<api_types::ProposerData>, _>>()
|
||||
.map(|duties| {
|
||||
|
||||
api_types::DutiesResponse{
|
||||
api_types::DutiesResponse {
|
||||
dependent_root,
|
||||
data: duties,
|
||||
}
|
||||
@@ -2053,7 +2051,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
"attestation_slot" => aggregate.message.aggregate.data.slot,
|
||||
);
|
||||
failures.push(api_types::Failure::new(index, format!("Verification: {:?}", e)));
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2087,7 +2085,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
|
||||
if !failures.is_empty() {
|
||||
Err(warp_utils::reject::indexed_bad_request("error processing aggregate and proofs".to_string(),
|
||||
failures
|
||||
failures,
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
@@ -2358,24 +2356,6 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
})
|
||||
});
|
||||
|
||||
fn merge_streams<T: EthSpec>(
|
||||
stream_map: StreamMap<
|
||||
String,
|
||||
impl Stream<Item = Result<EventKind<T>, RecvError>> + Unpin + Send + 'static,
|
||||
>,
|
||||
) -> impl Stream<Item = Result<impl ServerSentEvent + Send + 'static, ServerSentEventError>>
|
||||
+ Send
|
||||
+ 'static {
|
||||
// Convert messages into Server-Sent Events and return resulting stream.
|
||||
stream_map.map(move |(topic_name, msg)| match msg {
|
||||
Ok(data) => Ok((warp::sse::event(topic_name), warp::sse::json(data)).boxed()),
|
||||
Err(e) => Err(warp_utils::reject::server_sent_event_error(format!(
|
||||
"{:?}",
|
||||
e
|
||||
))),
|
||||
})
|
||||
}
|
||||
|
||||
let get_events = eth1_v1
|
||||
.and(warp::path("events"))
|
||||
.and(warp::path::end())
|
||||
@@ -2385,7 +2365,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
|topics: api_types::EventQuery, chain: Arc<BeaconChain<T>>| {
|
||||
blocking_task(move || {
|
||||
// for each topic subscribed spawn a new subscription
|
||||
let mut stream_map = StreamMap::with_capacity(topics.topics.0.len());
|
||||
let mut receivers = Vec::with_capacity(topics.topics.0.len());
|
||||
|
||||
if let Some(event_handler) = chain.event_handler.as_ref() {
|
||||
for topic in topics.topics.0.clone() {
|
||||
@@ -2402,7 +2382,24 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
event_handler.subscribe_finalized()
|
||||
}
|
||||
};
|
||||
stream_map.insert(topic.to_string(), Box::pin(receiver.into_stream()));
|
||||
|
||||
receivers.push(broadcast_stream::BroadcastStream::new(receiver).map(
|
||||
|msg| {
|
||||
match msg {
|
||||
Ok(data) => Event::default()
|
||||
.event(data.topic_name())
|
||||
.json_data(data)
|
||||
.map_err(|e| {
|
||||
warp_utils::reject::server_sent_event_error(
|
||||
format!("{:?}", e),
|
||||
)
|
||||
}),
|
||||
Err(e) => Err(warp_utils::reject::server_sent_event_error(
|
||||
format!("{:?}", e),
|
||||
)),
|
||||
}
|
||||
},
|
||||
));
|
||||
}
|
||||
} else {
|
||||
return Err(warp_utils::reject::custom_server_error(
|
||||
@@ -2410,11 +2407,9 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
));
|
||||
}
|
||||
|
||||
let stream = merge_streams(stream_map);
|
||||
let s = futures::stream::select_all(receivers);
|
||||
|
||||
Ok::<_, warp::Rejection>(warp::sse::reply(
|
||||
warp::sse::keep_alive().stream(stream),
|
||||
))
|
||||
Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user