mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-04 17:21:44 +00:00
Implement status v2 version (#7590)
N/A Implements status v2 as defined in https://github.com/ethereum/consensus-specs/pull/4374/
This commit is contained in:
@@ -746,6 +746,7 @@ impl<E: EthSpec> PeerDB<E> {
|
||||
head_root: Hash256::ZERO,
|
||||
finalized_epoch: Epoch::new(0),
|
||||
finalized_root: Hash256::ZERO,
|
||||
earliest_available_slot: Some(Slot::new(0)),
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
@@ -25,6 +25,7 @@ pub struct SyncInfo {
|
||||
pub head_root: Hash256,
|
||||
pub finalized_epoch: Epoch,
|
||||
pub finalized_root: Hash256,
|
||||
pub earliest_available_slot: Option<Slot>,
|
||||
}
|
||||
|
||||
impl std::cmp::PartialEq for SyncStatus {
|
||||
|
||||
@@ -67,7 +67,13 @@ impl<E: EthSpec> SSZSnappyInboundCodec<E> {
|
||||
) -> Result<(), RPCError> {
|
||||
let bytes = match &item {
|
||||
RpcResponse::Success(resp) => match &resp {
|
||||
RpcSuccessResponse::Status(res) => res.as_ssz_bytes(),
|
||||
RpcSuccessResponse::Status(res) => match self.protocol.versioned_protocol {
|
||||
SupportedProtocol::StatusV1 => res.status_v1().as_ssz_bytes(),
|
||||
SupportedProtocol::StatusV2 => res.status_v2().as_ssz_bytes(),
|
||||
_ => {
|
||||
unreachable!("We only send status responses on negotiating status protocol")
|
||||
}
|
||||
},
|
||||
RpcSuccessResponse::BlocksByRange(res) => res.as_ssz_bytes(),
|
||||
RpcSuccessResponse::BlocksByRoot(res) => res.as_ssz_bytes(),
|
||||
RpcSuccessResponse::BlobsByRange(res) => res.as_ssz_bytes(),
|
||||
@@ -329,7 +335,16 @@ impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
|
||||
|
||||
fn encode(&mut self, item: RequestType<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
let bytes = match item {
|
||||
RequestType::Status(req) => req.as_ssz_bytes(),
|
||||
RequestType::Status(req) => {
|
||||
// Send the status message based on the negotiated protocol
|
||||
match self.protocol.versioned_protocol {
|
||||
SupportedProtocol::StatusV1 => req.status_v1().as_ssz_bytes(),
|
||||
SupportedProtocol::StatusV2 => req.status_v2().as_ssz_bytes(),
|
||||
_ => {
|
||||
unreachable!("We only send status requests on negotiating status protocol")
|
||||
}
|
||||
}
|
||||
}
|
||||
RequestType::Goodbye(req) => req.as_ssz_bytes(),
|
||||
RequestType::BlocksByRange(r) => match r {
|
||||
OldBlocksByRangeRequest::V1(req) => req.as_ssz_bytes(),
|
||||
@@ -553,9 +568,12 @@ fn handle_rpc_request<E: EthSpec>(
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Option<RequestType<E>>, RPCError> {
|
||||
match versioned_protocol {
|
||||
SupportedProtocol::StatusV1 => Ok(Some(RequestType::Status(
|
||||
StatusMessage::from_ssz_bytes(decoded_buffer)?,
|
||||
))),
|
||||
SupportedProtocol::StatusV1 => Ok(Some(RequestType::Status(StatusMessage::V1(
|
||||
StatusMessageV1::from_ssz_bytes(decoded_buffer)?,
|
||||
)))),
|
||||
SupportedProtocol::StatusV2 => Ok(Some(RequestType::Status(StatusMessage::V2(
|
||||
StatusMessageV2::from_ssz_bytes(decoded_buffer)?,
|
||||
)))),
|
||||
SupportedProtocol::GoodbyeV1 => Ok(Some(RequestType::Goodbye(
|
||||
GoodbyeReason::from_ssz_bytes(decoded_buffer)?,
|
||||
))),
|
||||
@@ -666,9 +684,12 @@ fn handle_rpc_response<E: EthSpec>(
|
||||
fork_name: Option<ForkName>,
|
||||
) -> Result<Option<RpcSuccessResponse<E>>, RPCError> {
|
||||
match versioned_protocol {
|
||||
SupportedProtocol::StatusV1 => Ok(Some(RpcSuccessResponse::Status(
|
||||
StatusMessage::from_ssz_bytes(decoded_buffer)?,
|
||||
))),
|
||||
SupportedProtocol::StatusV1 => Ok(Some(RpcSuccessResponse::Status(StatusMessage::V1(
|
||||
StatusMessageV1::from_ssz_bytes(decoded_buffer)?,
|
||||
)))),
|
||||
SupportedProtocol::StatusV2 => Ok(Some(RpcSuccessResponse::Status(StatusMessage::V2(
|
||||
StatusMessageV2::from_ssz_bytes(decoded_buffer)?,
|
||||
)))),
|
||||
// This case should be unreachable as `Goodbye` has no response.
|
||||
SupportedProtocol::GoodbyeV1 => Err(RPCError::InvalidData(
|
||||
"Goodbye RPC message has no valid response".to_string(),
|
||||
@@ -1036,14 +1057,25 @@ mod tests {
|
||||
SignedBeaconBlock::from_block(block, Signature::empty())
|
||||
}
|
||||
|
||||
fn status_message() -> StatusMessage {
|
||||
StatusMessage {
|
||||
fn status_message_v1() -> StatusMessage {
|
||||
StatusMessage::V1(StatusMessageV1 {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::zero(),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
head_root: Hash256::zero(),
|
||||
head_slot: Slot::new(1),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn status_message_v2() -> StatusMessage {
|
||||
StatusMessage::V2(StatusMessageV2 {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::zero(),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
head_root: Hash256::zero(),
|
||||
head_slot: Slot::new(1),
|
||||
earliest_available_slot: Slot::new(0),
|
||||
})
|
||||
}
|
||||
|
||||
fn bbrange_request_v1() -> OldBlocksByRangeRequest {
|
||||
@@ -1284,11 +1316,22 @@ mod tests {
|
||||
assert_eq!(
|
||||
encode_then_decode_response(
|
||||
SupportedProtocol::StatusV1,
|
||||
RpcResponse::Success(RpcSuccessResponse::Status(status_message())),
|
||||
RpcResponse::Success(RpcSuccessResponse::Status(status_message_v1())),
|
||||
ForkName::Base,
|
||||
&chain_spec,
|
||||
),
|
||||
Ok(Some(RpcSuccessResponse::Status(status_message())))
|
||||
Ok(Some(RpcSuccessResponse::Status(status_message_v1())))
|
||||
);
|
||||
|
||||
// A StatusV2 still encodes as a StatusV1 since version is Version::V1
|
||||
assert_eq!(
|
||||
encode_then_decode_response(
|
||||
SupportedProtocol::StatusV1,
|
||||
RpcResponse::Success(RpcSuccessResponse::Status(status_message_v2())),
|
||||
ForkName::Fulu,
|
||||
&chain_spec,
|
||||
),
|
||||
Ok(Some(RpcSuccessResponse::Status(status_message_v1())))
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
@@ -1716,6 +1759,27 @@ mod tests {
|
||||
),
|
||||
Ok(Some(RpcSuccessResponse::MetaData(metadata_v2())))
|
||||
);
|
||||
|
||||
// A StatusV1 still encodes as a StatusV2 since version is Version::V2
|
||||
assert_eq!(
|
||||
encode_then_decode_response(
|
||||
SupportedProtocol::StatusV2,
|
||||
RpcResponse::Success(RpcSuccessResponse::Status(status_message_v1())),
|
||||
ForkName::Fulu,
|
||||
&chain_spec,
|
||||
),
|
||||
Ok(Some(RpcSuccessResponse::Status(status_message_v2())))
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
encode_then_decode_response(
|
||||
SupportedProtocol::StatusV2,
|
||||
RpcResponse::Success(RpcSuccessResponse::Status(status_message_v2())),
|
||||
ForkName::Fulu,
|
||||
&chain_spec,
|
||||
),
|
||||
Ok(Some(RpcSuccessResponse::Status(status_message_v2())))
|
||||
);
|
||||
}
|
||||
|
||||
// Test RPCResponse encoding/decoding for V2 messages
|
||||
@@ -1901,7 +1965,8 @@ mod tests {
|
||||
|
||||
let requests: &[RequestType<Spec>] = &[
|
||||
RequestType::Ping(ping_message()),
|
||||
RequestType::Status(status_message()),
|
||||
RequestType::Status(status_message_v1()),
|
||||
RequestType::Status(status_message_v2()),
|
||||
RequestType::Goodbye(GoodbyeReason::Fault),
|
||||
RequestType::BlocksByRange(bbrange_request_v1()),
|
||||
RequestType::BlocksByRange(bbrange_request_v2()),
|
||||
@@ -1948,7 +2013,7 @@ mod tests {
|
||||
let malicious_padding: &'static [u8] = b"\xFE\x00\x00\x00";
|
||||
|
||||
// Status message is 84 bytes uncompressed. `max_compressed_len` is 32 + 84 + 84/6 = 130.
|
||||
let status_message_bytes = StatusMessage {
|
||||
let status_message_bytes = StatusMessageV1 {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::zero(),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
@@ -2071,7 +2136,7 @@ mod tests {
|
||||
assert_eq!(stream_identifier.len(), 10);
|
||||
|
||||
// Status message is 84 bytes uncompressed. `max_compressed_len` is 32 + 84 + 84/6 = 130.
|
||||
let status_message_bytes = StatusMessage {
|
||||
let status_message_bytes = StatusMessageV1 {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::zero(),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
|
||||
@@ -63,7 +63,11 @@ impl Display for ErrorType {
|
||||
/* Requests */
|
||||
|
||||
/// The STATUS request/response handshake message.
|
||||
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
|
||||
#[superstruct(
|
||||
variants(V1, V2),
|
||||
variant_attributes(derive(Encode, Decode, Clone, Debug, PartialEq),)
|
||||
)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct StatusMessage {
|
||||
/// The fork version of the chain we are broadcasting.
|
||||
pub fork_digest: [u8; 4],
|
||||
@@ -79,6 +83,43 @@ pub struct StatusMessage {
|
||||
|
||||
/// The slot associated with the latest block root.
|
||||
pub head_slot: Slot,
|
||||
|
||||
/// The slot after which we guarantee to have all the blocks
|
||||
/// and blobs/data columns that we currently advertise.
|
||||
#[superstruct(only(V2))]
|
||||
pub earliest_available_slot: Slot,
|
||||
}
|
||||
|
||||
impl StatusMessage {
|
||||
pub fn status_v1(&self) -> StatusMessageV1 {
|
||||
match &self {
|
||||
Self::V1(status) => status.clone(),
|
||||
Self::V2(status) => StatusMessageV1 {
|
||||
fork_digest: status.fork_digest,
|
||||
finalized_root: status.finalized_root,
|
||||
finalized_epoch: status.finalized_epoch,
|
||||
head_root: status.head_root,
|
||||
head_slot: status.head_slot,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn status_v2(&self) -> StatusMessageV2 {
|
||||
match &self {
|
||||
Self::V1(status) => StatusMessageV2 {
|
||||
fork_digest: status.fork_digest,
|
||||
finalized_root: status.finalized_root,
|
||||
finalized_epoch: status.finalized_epoch,
|
||||
head_root: status.head_root,
|
||||
head_slot: status.head_slot,
|
||||
// Note: we always produce a V2 message as our local
|
||||
// status message, so this match arm should ideally never
|
||||
// be invoked in lighthouse.
|
||||
earliest_available_slot: Slot::new(0),
|
||||
},
|
||||
Self::V2(status) => status.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The PING request/response message.
|
||||
@@ -726,7 +767,7 @@ impl std::fmt::Display for RpcErrorResponse {
|
||||
|
||||
impl std::fmt::Display for StatusMessage {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "Status Message: Fork Digest: {:?}, Finalized Root: {}, Finalized Epoch: {}, Head Root: {}, Head Slot: {}", self.fork_digest, self.finalized_root, self.finalized_epoch, self.head_root, self.head_slot)
|
||||
write!(f, "Status Message: Fork Digest: {:?}, Finalized Root: {}, Finalized Epoch: {}, Head Root: {}, Head Slot: {}, Earliest available slot: {:?}", self.fork_digest(), self.finalized_root(), self.finalized_epoch(), self.head_root(), self.head_slot(), self.earliest_available_slot())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -298,6 +298,7 @@ pub enum Encoding {
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum SupportedProtocol {
|
||||
StatusV1,
|
||||
StatusV2,
|
||||
GoodbyeV1,
|
||||
BlocksByRangeV1,
|
||||
BlocksByRangeV2,
|
||||
@@ -321,6 +322,7 @@ impl SupportedProtocol {
|
||||
pub fn version_string(&self) -> &'static str {
|
||||
match self {
|
||||
SupportedProtocol::StatusV1 => "1",
|
||||
SupportedProtocol::StatusV2 => "2",
|
||||
SupportedProtocol::GoodbyeV1 => "1",
|
||||
SupportedProtocol::BlocksByRangeV1 => "1",
|
||||
SupportedProtocol::BlocksByRangeV2 => "2",
|
||||
@@ -344,6 +346,7 @@ impl SupportedProtocol {
|
||||
pub fn protocol(&self) -> Protocol {
|
||||
match self {
|
||||
SupportedProtocol::StatusV1 => Protocol::Status,
|
||||
SupportedProtocol::StatusV2 => Protocol::Status,
|
||||
SupportedProtocol::GoodbyeV1 => Protocol::Goodbye,
|
||||
SupportedProtocol::BlocksByRangeV1 => Protocol::BlocksByRange,
|
||||
SupportedProtocol::BlocksByRangeV2 => Protocol::BlocksByRange,
|
||||
@@ -368,6 +371,7 @@ impl SupportedProtocol {
|
||||
|
||||
fn currently_supported(fork_context: &ForkContext) -> Vec<ProtocolId> {
|
||||
let mut supported = vec![
|
||||
ProtocolId::new(Self::StatusV2, Encoding::SSZSnappy),
|
||||
ProtocolId::new(Self::StatusV1, Encoding::SSZSnappy),
|
||||
ProtocolId::new(Self::GoodbyeV1, Encoding::SSZSnappy),
|
||||
// V2 variants have higher preference then V1
|
||||
@@ -492,8 +496,8 @@ impl ProtocolId {
|
||||
pub fn rpc_request_limits(&self, spec: &ChainSpec) -> RpcLimits {
|
||||
match self.versioned_protocol.protocol() {
|
||||
Protocol::Status => RpcLimits::new(
|
||||
<StatusMessage as Encode>::ssz_fixed_len(),
|
||||
<StatusMessage as Encode>::ssz_fixed_len(),
|
||||
<StatusMessageV1 as Encode>::ssz_fixed_len(),
|
||||
<StatusMessageV2 as Encode>::ssz_fixed_len(),
|
||||
),
|
||||
Protocol::Goodbye => RpcLimits::new(
|
||||
<GoodbyeReason as Encode>::ssz_fixed_len(),
|
||||
@@ -537,8 +541,8 @@ impl ProtocolId {
|
||||
pub fn rpc_response_limits<E: EthSpec>(&self, fork_context: &ForkContext) -> RpcLimits {
|
||||
match self.versioned_protocol.protocol() {
|
||||
Protocol::Status => RpcLimits::new(
|
||||
<StatusMessage as Encode>::ssz_fixed_len(),
|
||||
<StatusMessage as Encode>::ssz_fixed_len(),
|
||||
<StatusMessageV1 as Encode>::ssz_fixed_len(),
|
||||
<StatusMessageV2 as Encode>::ssz_fixed_len(),
|
||||
),
|
||||
Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response
|
||||
Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()),
|
||||
@@ -589,6 +593,7 @@ impl ProtocolId {
|
||||
| SupportedProtocol::LightClientFinalityUpdateV1
|
||||
| SupportedProtocol::LightClientUpdatesByRangeV1 => true,
|
||||
SupportedProtocol::StatusV1
|
||||
| SupportedProtocol::StatusV2
|
||||
| SupportedProtocol::BlocksByRootV1
|
||||
| SupportedProtocol::BlocksByRangeV1
|
||||
| SupportedProtocol::PingV1
|
||||
@@ -758,7 +763,10 @@ impl<E: EthSpec> RequestType<E> {
|
||||
/// Gives the corresponding `SupportedProtocol` to this request.
|
||||
pub fn versioned_protocol(&self) -> SupportedProtocol {
|
||||
match self {
|
||||
RequestType::Status(_) => SupportedProtocol::StatusV1,
|
||||
RequestType::Status(req) => match req {
|
||||
StatusMessage::V1(_) => SupportedProtocol::StatusV1,
|
||||
StatusMessage::V2(_) => SupportedProtocol::StatusV2,
|
||||
},
|
||||
RequestType::Goodbye(_) => SupportedProtocol::GoodbyeV1,
|
||||
RequestType::BlocksByRange(req) => match req {
|
||||
OldBlocksByRangeRequest::V1(_) => SupportedProtocol::BlocksByRangeV1,
|
||||
@@ -817,10 +825,10 @@ impl<E: EthSpec> RequestType<E> {
|
||||
pub fn supported_protocols(&self) -> Vec<ProtocolId> {
|
||||
match self {
|
||||
// add more protocols when versions/encodings are supported
|
||||
RequestType::Status(_) => vec![ProtocolId::new(
|
||||
SupportedProtocol::StatusV1,
|
||||
Encoding::SSZSnappy,
|
||||
)],
|
||||
RequestType::Status(_) => vec![
|
||||
ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy),
|
||||
ProtocolId::new(SupportedProtocol::StatusV2, Encoding::SSZSnappy),
|
||||
],
|
||||
RequestType::Goodbye(_) => vec![ProtocolId::new(
|
||||
SupportedProtocol::GoodbyeV1,
|
||||
Encoding::SSZSnappy,
|
||||
|
||||
@@ -75,22 +75,22 @@ fn test_tcp_status_rpc() {
|
||||
.await;
|
||||
|
||||
// Dummy STATUS RPC message
|
||||
let rpc_request = RequestType::Status(StatusMessage {
|
||||
let rpc_request = RequestType::Status(StatusMessage::V1(StatusMessageV1 {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::zero(),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
head_root: Hash256::zero(),
|
||||
head_slot: Slot::new(1),
|
||||
});
|
||||
}));
|
||||
|
||||
// Dummy STATUS RPC message
|
||||
let rpc_response = Response::Status(StatusMessage {
|
||||
let rpc_response = Response::Status(StatusMessage::V1(StatusMessageV1 {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::zero(),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
head_root: Hash256::zero(),
|
||||
head_slot: Slot::new(1),
|
||||
});
|
||||
}));
|
||||
|
||||
// build the sender future
|
||||
let sender_future = async {
|
||||
@@ -1199,22 +1199,22 @@ fn test_delayed_rpc_response() {
|
||||
.await;
|
||||
|
||||
// Dummy STATUS RPC message
|
||||
let rpc_request = RequestType::Status(StatusMessage {
|
||||
let rpc_request = RequestType::Status(StatusMessage::V1(StatusMessageV1 {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::from_low_u64_be(0),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
head_root: Hash256::from_low_u64_be(0),
|
||||
head_slot: Slot::new(1),
|
||||
});
|
||||
}));
|
||||
|
||||
// Dummy STATUS RPC message
|
||||
let rpc_response = Response::Status(StatusMessage {
|
||||
let rpc_response = Response::Status(StatusMessage::V1(StatusMessageV1 {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::from_low_u64_be(0),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
head_root: Hash256::from_low_u64_be(0),
|
||||
head_slot: Slot::new(1),
|
||||
});
|
||||
}));
|
||||
|
||||
// build the sender future
|
||||
let sender_future = async {
|
||||
@@ -1329,22 +1329,22 @@ fn test_active_requests() {
|
||||
.await;
|
||||
|
||||
// Dummy STATUS RPC request.
|
||||
let rpc_request = RequestType::Status(StatusMessage {
|
||||
let rpc_request = RequestType::Status(StatusMessage::V1(StatusMessageV1 {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::from_low_u64_be(0),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
head_root: Hash256::from_low_u64_be(0),
|
||||
head_slot: Slot::new(1),
|
||||
});
|
||||
}));
|
||||
|
||||
// Dummy STATUS RPC response.
|
||||
let rpc_response = Response::Status(StatusMessage {
|
||||
let rpc_response = Response::Status(StatusMessage::V1(StatusMessageV1 {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::zero(),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
head_root: Hash256::zero(),
|
||||
head_slot: Slot::new(1),
|
||||
});
|
||||
}));
|
||||
|
||||
// Number of requests.
|
||||
const REQUESTS: u8 = 10;
|
||||
|
||||
@@ -70,14 +70,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
let local = self.chain.status_message();
|
||||
let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch());
|
||||
|
||||
let irrelevant_reason = if local.fork_digest != remote.fork_digest {
|
||||
let irrelevant_reason = if local.fork_digest() != remote.fork_digest() {
|
||||
// The node is on a different network/fork
|
||||
Some(format!(
|
||||
"Incompatible forks Ours:{} Theirs:{}",
|
||||
hex::encode(local.fork_digest),
|
||||
hex::encode(remote.fork_digest)
|
||||
hex::encode(local.fork_digest()),
|
||||
hex::encode(remote.fork_digest())
|
||||
))
|
||||
} else if remote.head_slot
|
||||
} else if *remote.head_slot()
|
||||
> self
|
||||
.chain
|
||||
.slot()
|
||||
@@ -88,11 +88,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
// current slot. This could be because they are using a different genesis time, or that
|
||||
// their or our system's clock is incorrect.
|
||||
Some("Different system clocks or genesis time".to_string())
|
||||
} else if (remote.finalized_epoch == local.finalized_epoch
|
||||
&& remote.finalized_root == local.finalized_root)
|
||||
|| remote.finalized_root.is_zero()
|
||||
|| local.finalized_root.is_zero()
|
||||
|| remote.finalized_epoch > local.finalized_epoch
|
||||
} else if (remote.finalized_epoch() == local.finalized_epoch()
|
||||
&& remote.finalized_root() == local.finalized_root())
|
||||
|| remote.finalized_root().is_zero()
|
||||
|| local.finalized_root().is_zero()
|
||||
|| remote.finalized_epoch() > local.finalized_epoch()
|
||||
{
|
||||
// Fast path. Remote finalized checkpoint is either identical, or genesis, or we are at
|
||||
// genesis, or they are ahead. In all cases, we should allow this peer to connect to us
|
||||
@@ -100,7 +100,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
None
|
||||
} else {
|
||||
// Remote finalized epoch is less than ours.
|
||||
let remote_finalized_slot = start_slot(remote.finalized_epoch);
|
||||
let remote_finalized_slot = start_slot(*remote.finalized_epoch());
|
||||
if remote_finalized_slot < self.chain.store.get_oldest_block_slot() {
|
||||
// Peer's finalized checkpoint is older than anything in our DB. We are unlikely
|
||||
// to be able to help them sync.
|
||||
@@ -112,7 +112,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
if self
|
||||
.chain
|
||||
.block_root_at_slot(remote_finalized_slot, WhenSlotSkipped::Prev)
|
||||
.map(|root_opt| root_opt != Some(remote.finalized_root))
|
||||
.map(|root_opt| root_opt != Some(*remote.finalized_root()))
|
||||
.map_err(Box::new)?
|
||||
{
|
||||
Some("Different finalized chain".to_string())
|
||||
@@ -138,10 +138,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
}
|
||||
Ok(None) => {
|
||||
let info = SyncInfo {
|
||||
head_slot: status.head_slot,
|
||||
head_root: status.head_root,
|
||||
finalized_epoch: status.finalized_epoch,
|
||||
finalized_root: status.finalized_root,
|
||||
head_slot: *status.head_slot(),
|
||||
head_root: *status.head_root(),
|
||||
finalized_epoch: *status.finalized_epoch(),
|
||||
finalized_root: *status.finalized_root(),
|
||||
earliest_available_slot: status.earliest_available_slot().ok().cloned(),
|
||||
};
|
||||
self.send_sync_message(SyncMessage::AddPeer(peer_id, info));
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use types::{EthSpec, FixedBytesExtended, Hash256};
|
||||
|
||||
use lighthouse_network::rpc::StatusMessage;
|
||||
use lighthouse_network::rpc::{methods::StatusMessageV2, StatusMessage};
|
||||
/// Trait to produce a `StatusMessage` representing the state of the given `beacon_chain`.
|
||||
///
|
||||
/// NOTE: The purpose of this is simply to obtain a `StatusMessage` from the `BeaconChain` without
|
||||
@@ -29,11 +29,14 @@ pub(crate) fn status_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>)
|
||||
finalized_checkpoint.root = Hash256::zero();
|
||||
}
|
||||
|
||||
StatusMessage {
|
||||
let earliest_available_slot = beacon_chain.store.get_anchor_info().oldest_block_slot;
|
||||
|
||||
StatusMessage::V2(StatusMessageV2 {
|
||||
fork_digest,
|
||||
finalized_root: finalized_checkpoint.root,
|
||||
finalized_epoch: finalized_checkpoint.epoch,
|
||||
head_root: cached_head.head_block_root(),
|
||||
head_slot: cached_head.head_slot(),
|
||||
}
|
||||
earliest_available_slot,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1243,6 +1243,7 @@ mod tests {
|
||||
head_root: Hash256::random(),
|
||||
finalized_epoch,
|
||||
finalized_root: Hash256::random(),
|
||||
earliest_available_slot: None,
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
@@ -398,10 +398,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
// ensure the beacon chain still exists
|
||||
let status = self.chain.status_message();
|
||||
let local = SyncInfo {
|
||||
head_slot: status.head_slot,
|
||||
head_root: status.head_root,
|
||||
finalized_epoch: status.finalized_epoch,
|
||||
finalized_root: status.finalized_root,
|
||||
head_slot: *status.head_slot(),
|
||||
head_root: *status.head_root(),
|
||||
finalized_epoch: *status.finalized_epoch(),
|
||||
finalized_root: *status.finalized_root(),
|
||||
earliest_available_slot: status.earliest_available_slot().ok().cloned(),
|
||||
};
|
||||
|
||||
let sync_type = remote_sync_type(&local, &remote, &self.chain);
|
||||
@@ -450,10 +451,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
) {
|
||||
let status = self.chain.status_message();
|
||||
let local = SyncInfo {
|
||||
head_slot: status.head_slot,
|
||||
head_root: status.head_root,
|
||||
finalized_epoch: status.finalized_epoch,
|
||||
finalized_root: status.finalized_root,
|
||||
head_slot: *status.head_slot(),
|
||||
head_root: *status.head_root(),
|
||||
finalized_epoch: *status.finalized_epoch(),
|
||||
finalized_root: *status.finalized_root(),
|
||||
earliest_available_slot: status.earliest_available_slot().ok().cloned(),
|
||||
};
|
||||
|
||||
let head_slot = head_slot.unwrap_or_else(|| {
|
||||
@@ -471,6 +473,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
// Set finalized to same as local to trigger Head sync
|
||||
finalized_epoch: local.finalized_epoch,
|
||||
finalized_root: local.finalized_root,
|
||||
earliest_available_slot: local.earliest_available_slot,
|
||||
};
|
||||
|
||||
for peer_id in peers {
|
||||
|
||||
@@ -384,11 +384,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
for peer_id in peers {
|
||||
debug!(
|
||||
peer = %peer_id,
|
||||
fork_digest = ?status_message.fork_digest,
|
||||
finalized_root = ?status_message.finalized_root,
|
||||
finalized_epoch = ?status_message.finalized_epoch,
|
||||
head_root = %status_message.head_root,
|
||||
head_slot = %status_message.head_slot,
|
||||
fork_digest = ?status_message.fork_digest(),
|
||||
finalized_root = ?status_message.finalized_root(),
|
||||
finalized_epoch = ?status_message.finalized_epoch(),
|
||||
head_root = %status_message.head_root(),
|
||||
head_slot = %status_message.head_slot(),
|
||||
earliest_available_slot = ?status_message.earliest_available_slot(),
|
||||
"Sending Status Request"
|
||||
);
|
||||
|
||||
|
||||
@@ -411,10 +411,11 @@ where
|
||||
|
||||
let status = self.beacon_chain.status_message();
|
||||
let local = SyncInfo {
|
||||
head_slot: status.head_slot,
|
||||
head_root: status.head_root,
|
||||
finalized_epoch: status.finalized_epoch,
|
||||
finalized_root: status.finalized_root,
|
||||
head_slot: *status.head_slot(),
|
||||
head_root: *status.head_root(),
|
||||
finalized_epoch: *status.finalized_epoch(),
|
||||
finalized_root: *status.finalized_root(),
|
||||
earliest_available_slot: status.earliest_available_slot().ok().cloned(),
|
||||
};
|
||||
|
||||
// update the state of the collection
|
||||
|
||||
@@ -11,9 +11,9 @@ use beacon_chain::{block_verification_types::RpcBlock, EngineState, NotifyExecut
|
||||
use beacon_processor::WorkType;
|
||||
use lighthouse_network::rpc::methods::{
|
||||
BlobsByRangeRequest, DataColumnsByRangeRequest, OldBlocksByRangeRequest,
|
||||
OldBlocksByRangeRequestV2,
|
||||
OldBlocksByRangeRequestV2, StatusMessageV2,
|
||||
};
|
||||
use lighthouse_network::rpc::{RequestType, StatusMessage};
|
||||
use lighthouse_network::rpc::RequestType;
|
||||
use lighthouse_network::service::api_types::{
|
||||
AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId,
|
||||
SyncRequestId,
|
||||
@@ -98,6 +98,7 @@ impl TestRig {
|
||||
finalized_root,
|
||||
head_slot: finalized_epoch.start_slot(E::slots_per_epoch()),
|
||||
head_root: Hash256::random(),
|
||||
earliest_available_slot: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -109,22 +110,25 @@ impl TestRig {
|
||||
finalized_root: Hash256::random(),
|
||||
head_slot: finalized_epoch.start_slot(E::slots_per_epoch()),
|
||||
head_root: Hash256::random(),
|
||||
earliest_available_slot: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn local_info(&self) -> SyncInfo {
|
||||
let StatusMessage {
|
||||
let StatusMessageV2 {
|
||||
fork_digest: _,
|
||||
finalized_root,
|
||||
finalized_epoch,
|
||||
head_root,
|
||||
head_slot,
|
||||
} = self.harness.chain.status_message();
|
||||
earliest_available_slot,
|
||||
} = self.harness.chain.status_message().status_v2();
|
||||
SyncInfo {
|
||||
head_slot,
|
||||
head_root,
|
||||
finalized_epoch,
|
||||
finalized_root,
|
||||
earliest_available_slot: Some(earliest_available_slot),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user