mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-18 03:08:28 +00:00
Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_attestation
This commit is contained in:
@@ -5,51 +5,51 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
|
||||
edition = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
sloggers = { workspace = true }
|
||||
bls = { workspace = true }
|
||||
eth2 = { workspace = true }
|
||||
eth2_network_config = { workspace = true }
|
||||
genesis = { workspace = true }
|
||||
gossipsub = { workspace = true }
|
||||
kzg = { workspace = true }
|
||||
matches = "0.1.8"
|
||||
serde_json = { workspace = true }
|
||||
slog-term = { workspace = true }
|
||||
slog-async = { workspace = true }
|
||||
eth2 = { workspace = true }
|
||||
gossipsub = { workspace = true }
|
||||
eth2_network_config = { workspace = true }
|
||||
kzg = { workspace = true }
|
||||
bls = { workspace = true }
|
||||
slog-term = { workspace = true }
|
||||
sloggers = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
alloy-primitives = { workspace = true }
|
||||
async-channel = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
beacon_chain = { workspace = true }
|
||||
store = { workspace = true }
|
||||
lighthouse_network = { workspace = true }
|
||||
types = { workspace = true }
|
||||
slot_clock = { workspace = true }
|
||||
slog = { workspace = true }
|
||||
hex = { workspace = true }
|
||||
ethereum_ssz = { workspace = true }
|
||||
ssz_types = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-stream = { workspace = true }
|
||||
smallvec = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
fnv = { workspace = true }
|
||||
alloy-rlp = { workspace = true }
|
||||
metrics = { workspace = true }
|
||||
logging = { workspace = true }
|
||||
task_executor = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
async-channel = { workspace = true }
|
||||
beacon_chain = { workspace = true }
|
||||
beacon_processor = { workspace = true }
|
||||
delay_map = { workspace = true }
|
||||
derivative = { workspace = true }
|
||||
ethereum_ssz = { workspace = true }
|
||||
execution_layer = { workspace = true }
|
||||
fnv = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
hex = { workspace = true }
|
||||
igd-next = "0.14"
|
||||
itertools = { workspace = true }
|
||||
lighthouse_network = { workspace = true }
|
||||
logging = { workspace = true }
|
||||
lru_cache = { workspace = true }
|
||||
strum = { workspace = true }
|
||||
derivative = { workspace = true }
|
||||
delay_map = { workspace = true }
|
||||
metrics = { workspace = true }
|
||||
operation_pool = { workspace = true }
|
||||
execution_layer = { workspace = true }
|
||||
beacon_processor = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
slog = { workspace = true }
|
||||
slot_clock = { workspace = true }
|
||||
smallvec = { workspace = true }
|
||||
ssz_types = { workspace = true }
|
||||
store = { workspace = true }
|
||||
strum = { workspace = true }
|
||||
task_executor = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-stream = { workspace = true }
|
||||
types = { workspace = true }
|
||||
|
||||
[features]
|
||||
# NOTE: This can be run via cargo build --bin lighthouse --features network/disable-backfill
|
||||
|
||||
@@ -2,7 +2,8 @@ use beacon_chain::{
|
||||
attestation_verification::Error as AttnError,
|
||||
light_client_finality_update_verification::Error as LightClientFinalityUpdateError,
|
||||
light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError,
|
||||
sync_committee_verification::Error as SyncCommitteeError,
|
||||
sync_committee_verification::Error as SyncCommitteeError, AvailabilityProcessingStatus,
|
||||
BlockError,
|
||||
};
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::{
|
||||
@@ -11,12 +12,19 @@ use lighthouse_network::{
|
||||
};
|
||||
pub use metrics::*;
|
||||
use std::sync::{Arc, LazyLock};
|
||||
use strum::AsRefStr;
|
||||
use strum::IntoEnumIterator;
|
||||
use types::EthSpec;
|
||||
|
||||
pub const SUCCESS: &str = "SUCCESS";
|
||||
pub const FAILURE: &str = "FAILURE";
|
||||
|
||||
#[derive(Debug, AsRefStr)]
|
||||
pub(crate) enum BlockSource {
|
||||
Gossip,
|
||||
Rpc,
|
||||
}
|
||||
|
||||
pub static BEACON_BLOCK_MESH_PEERS_PER_CLIENT: LazyLock<Result<IntGaugeVec>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_int_gauge_vec(
|
||||
@@ -59,6 +67,27 @@ pub static SYNC_COMMITTEE_SUBSCRIPTION_REQUESTS: LazyLock<Result<IntCounter>> =
|
||||
)
|
||||
});
|
||||
|
||||
/*
|
||||
* Beacon processor
|
||||
*/
|
||||
pub static BEACON_PROCESSOR_MISSING_COMPONENTS: LazyLock<Result<IntCounterVec>> = LazyLock::new(
|
||||
|| {
|
||||
try_create_int_counter_vec(
|
||||
"beacon_processor_missing_components_total",
|
||||
"Total number of imported individual block components that resulted in missing components",
|
||||
&["source", "component"],
|
||||
)
|
||||
},
|
||||
);
|
||||
pub static BEACON_PROCESSOR_IMPORT_ERRORS_PER_TYPE: LazyLock<Result<IntCounterVec>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_int_counter_vec(
|
||||
"beacon_processor_import_errors_total",
|
||||
"Total number of block components that were not verified",
|
||||
&["source", "component", "type"],
|
||||
)
|
||||
});
|
||||
|
||||
/*
|
||||
* Gossip processor
|
||||
*/
|
||||
@@ -606,6 +635,37 @@ pub fn register_sync_committee_error(error: &SyncCommitteeError) {
|
||||
inc_counter_vec(&GOSSIP_SYNC_COMMITTEE_ERRORS_PER_TYPE, &[error.as_ref()]);
|
||||
}
|
||||
|
||||
pub(crate) fn register_process_result_metrics(
|
||||
result: &std::result::Result<AvailabilityProcessingStatus, BlockError>,
|
||||
source: BlockSource,
|
||||
block_component: &'static str,
|
||||
) {
|
||||
match result {
|
||||
Ok(status) => match status {
|
||||
AvailabilityProcessingStatus::Imported { .. } => match source {
|
||||
BlockSource::Gossip => {
|
||||
inc_counter(&BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
|
||||
}
|
||||
BlockSource::Rpc => {
|
||||
inc_counter(&BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
|
||||
}
|
||||
},
|
||||
AvailabilityProcessingStatus::MissingComponents { .. } => {
|
||||
inc_counter_vec(
|
||||
&BEACON_PROCESSOR_MISSING_COMPONENTS,
|
||||
&[source.as_ref(), block_component],
|
||||
);
|
||||
}
|
||||
},
|
||||
Err(error) => {
|
||||
inc_counter_vec(
|
||||
&BEACON_PROCESSOR_IMPORT_ERRORS_PER_TYPE,
|
||||
&[source.as_ref(), block_component, error.as_ref()],
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_result<T, E>(result: &std::result::Result<T, E>) -> &str {
|
||||
match result {
|
||||
Ok(_) => SUCCESS,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::{
|
||||
metrics,
|
||||
metrics::{self, register_process_result_metrics},
|
||||
network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor},
|
||||
service::NetworkMessage,
|
||||
sync::SyncMessage,
|
||||
@@ -710,8 +710,19 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
MessageAcceptance::Reject,
|
||||
);
|
||||
}
|
||||
GossipDataColumnError::PriorKnown { .. } => {
|
||||
// Data column is available via either the EL or reconstruction.
|
||||
// Do not penalise the peer.
|
||||
// Gossip filter should filter any duplicates received after this.
|
||||
debug!(
|
||||
self.log,
|
||||
"Received already available column sidecar. Ignoring the column sidecar";
|
||||
"slot" => %slot,
|
||||
"block_root" => %block_root,
|
||||
"index" => %index,
|
||||
)
|
||||
}
|
||||
GossipDataColumnError::FutureSlot { .. }
|
||||
| GossipDataColumnError::PriorKnown { .. }
|
||||
| GossipDataColumnError::PastFinalizedSlot { .. } => {
|
||||
debug!(
|
||||
self.log,
|
||||
@@ -852,7 +863,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
MessageAcceptance::Reject,
|
||||
);
|
||||
}
|
||||
GossipBlobError::FutureSlot { .. } | GossipBlobError::RepeatBlob { .. } => {
|
||||
GossipBlobError::RepeatBlob { .. } => {
|
||||
// We may have received the blob from the EL. Do not penalise the peer.
|
||||
// Gossip filter should filter any duplicates received after this.
|
||||
debug!(
|
||||
self.log,
|
||||
"Received already available blob sidecar. Ignoring the blob sidecar";
|
||||
"slot" => %slot,
|
||||
"root" => %root,
|
||||
"index" => %index,
|
||||
)
|
||||
}
|
||||
GossipBlobError::FutureSlot { .. } => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Could not verify blob sidecar for gossip. Ignoring the blob sidecar";
|
||||
@@ -915,12 +937,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
let blob_index = verified_blob.id().index;
|
||||
|
||||
let result = self.chain.process_gossip_blob(verified_blob).await;
|
||||
register_process_result_metrics(&result, metrics::BlockSource::Gossip, "blob");
|
||||
|
||||
match &result {
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
|
||||
// Note: Reusing block imported metric here
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
|
||||
debug!(
|
||||
info!(
|
||||
self.log,
|
||||
"Gossipsub blob processed - imported fully available block";
|
||||
"block_root" => %block_root
|
||||
@@ -989,43 +1010,39 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
let data_column_slot = verified_data_column.slot();
|
||||
let data_column_index = verified_data_column.id().index;
|
||||
|
||||
match self
|
||||
let result = self
|
||||
.chain
|
||||
.process_gossip_data_columns(vec![verified_data_column], || Ok(()))
|
||||
.await
|
||||
{
|
||||
Ok(availability) => {
|
||||
match availability {
|
||||
AvailabilityProcessingStatus::Imported(block_root) => {
|
||||
// Note: Reusing block imported metric here
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL,
|
||||
);
|
||||
info!(
|
||||
self.log,
|
||||
"Gossipsub data column processed, imported fully available block";
|
||||
"block_root" => %block_root
|
||||
);
|
||||
self.chain.recompute_head_at_current_slot().await;
|
||||
.await;
|
||||
register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column");
|
||||
|
||||
metrics::set_gauge(
|
||||
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
|
||||
processing_start_time.elapsed().as_millis() as i64,
|
||||
);
|
||||
}
|
||||
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Processed data column, waiting for other components";
|
||||
"slot" => %slot,
|
||||
"data_column_index" => %data_column_index,
|
||||
"block_root" => %block_root,
|
||||
);
|
||||
match result {
|
||||
Ok(availability) => match availability {
|
||||
AvailabilityProcessingStatus::Imported(block_root) => {
|
||||
info!(
|
||||
self.log,
|
||||
"Gossipsub data column processed, imported fully available block";
|
||||
"block_root" => %block_root
|
||||
);
|
||||
self.chain.recompute_head_at_current_slot().await;
|
||||
|
||||
self.attempt_data_column_reconstruction(block_root).await;
|
||||
}
|
||||
metrics::set_gauge(
|
||||
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
|
||||
processing_start_time.elapsed().as_millis() as i64,
|
||||
);
|
||||
}
|
||||
}
|
||||
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Processed data column, waiting for other components";
|
||||
"slot" => %slot,
|
||||
"data_column_index" => %data_column_index,
|
||||
"block_root" => %block_root,
|
||||
);
|
||||
|
||||
self.attempt_data_column_reconstruction(block_root).await;
|
||||
}
|
||||
},
|
||||
Err(BlockError::DuplicateFullyImported(_)) => {
|
||||
debug!(
|
||||
self.log,
|
||||
@@ -1467,11 +1484,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
NotifyExecutionLayer::Yes,
|
||||
)
|
||||
.await;
|
||||
register_process_result_metrics(&result, metrics::BlockSource::Gossip, "block");
|
||||
|
||||
match &result {
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
|
||||
|
||||
if reprocess_tx
|
||||
.try_send(ReprocessQueueMessage::BlockImported {
|
||||
block_root: *block_root,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::metrics;
|
||||
use crate::metrics::{self, register_process_result_metrics};
|
||||
use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE};
|
||||
use crate::sync::BatchProcessResult;
|
||||
use crate::sync::{
|
||||
@@ -163,8 +163,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
NotifyExecutionLayer::Yes,
|
||||
)
|
||||
.await;
|
||||
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
|
||||
register_process_result_metrics(&result, metrics::BlockSource::Rpc, "block");
|
||||
|
||||
// RPC block imported, regardless of process type
|
||||
match result.as_ref() {
|
||||
@@ -286,6 +285,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
}
|
||||
|
||||
let result = self.chain.process_rpc_blobs(slot, block_root, blobs).await;
|
||||
register_process_result_metrics(&result, metrics::BlockSource::Rpc, "blobs");
|
||||
|
||||
match &result {
|
||||
Ok(AvailabilityProcessingStatus::Imported(hash)) => {
|
||||
@@ -343,6 +343,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
.chain
|
||||
.process_rpc_custody_columns(custody_columns)
|
||||
.await;
|
||||
register_process_result_metrics(&result, metrics::BlockSource::Rpc, "custody_columns");
|
||||
|
||||
match &result {
|
||||
Ok(availability) => match availability {
|
||||
|
||||
@@ -527,7 +527,7 @@ impl TestRig {
|
||||
self.assert_event_journal(
|
||||
&expected
|
||||
.iter()
|
||||
.map(|ev| Into::<&'static str>::into(ev))
|
||||
.map(Into::<&'static str>::into)
|
||||
.chain(std::iter::once(WORKER_FREED))
|
||||
.chain(std::iter::once(NOTHING_TO_DO))
|
||||
.collect::<Vec<_>>(),
|
||||
|
||||
@@ -1,235 +1,229 @@
|
||||
#[cfg(not(debug_assertions))]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::persisted_dht::load_dht;
|
||||
use crate::{NetworkConfig, NetworkService};
|
||||
use beacon_chain::test_utils::BeaconChainHarness;
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use beacon_processor::{BeaconProcessorChannels, BeaconProcessorConfig};
|
||||
use futures::StreamExt;
|
||||
use lighthouse_network::types::{GossipEncoding, GossipKind};
|
||||
use lighthouse_network::{Enr, GossipTopic};
|
||||
use slog::{o, Drain, Level, Logger};
|
||||
use sloggers::{null::NullLoggerBuilder, Build};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::runtime::Runtime;
|
||||
use types::{Epoch, EthSpec, ForkName, MinimalEthSpec, SubnetId};
|
||||
#![cfg(not(debug_assertions))]
|
||||
#![cfg(test)]
|
||||
use crate::persisted_dht::load_dht;
|
||||
use crate::{NetworkConfig, NetworkService};
|
||||
use beacon_chain::test_utils::BeaconChainHarness;
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use beacon_processor::{BeaconProcessorChannels, BeaconProcessorConfig};
|
||||
use futures::StreamExt;
|
||||
use lighthouse_network::types::{GossipEncoding, GossipKind};
|
||||
use lighthouse_network::{Enr, GossipTopic};
|
||||
use slog::{o, Drain, Level, Logger};
|
||||
use sloggers::{null::NullLoggerBuilder, Build};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::runtime::Runtime;
|
||||
use types::{Epoch, EthSpec, ForkName, MinimalEthSpec, SubnetId};
|
||||
|
||||
impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
fn get_topic_params(&self, topic: GossipTopic) -> Option<&gossipsub::TopicScoreParams> {
|
||||
self.libp2p.get_topic_params(topic)
|
||||
}
|
||||
impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
fn get_topic_params(&self, topic: GossipTopic) -> Option<&gossipsub::TopicScoreParams> {
|
||||
self.libp2p.get_topic_params(topic)
|
||||
}
|
||||
}
|
||||
|
||||
fn get_logger(actual_log: bool) -> Logger {
|
||||
if actual_log {
|
||||
let drain = {
|
||||
let decorator = slog_term::TermDecorator::new().build();
|
||||
let decorator =
|
||||
logging::AlignedTermDecorator::new(decorator, logging::MAX_MESSAGE_WIDTH);
|
||||
let drain = slog_term::FullFormat::new(decorator).build().fuse();
|
||||
let drain = slog_async::Async::new(drain).chan_size(2048).build();
|
||||
drain.filter_level(Level::Debug)
|
||||
};
|
||||
fn get_logger(actual_log: bool) -> Logger {
|
||||
if actual_log {
|
||||
let drain = {
|
||||
let decorator = slog_term::TermDecorator::new().build();
|
||||
let decorator =
|
||||
logging::AlignedTermDecorator::new(decorator, logging::MAX_MESSAGE_WIDTH);
|
||||
let drain = slog_term::FullFormat::new(decorator).build().fuse();
|
||||
let drain = slog_async::Async::new(drain).chan_size(2048).build();
|
||||
drain.filter_level(Level::Debug)
|
||||
};
|
||||
|
||||
Logger::root(drain.fuse(), o!())
|
||||
} else {
|
||||
let builder = NullLoggerBuilder;
|
||||
builder.build().expect("should build logger")
|
||||
}
|
||||
Logger::root(drain.fuse(), o!())
|
||||
} else {
|
||||
let builder = NullLoggerBuilder;
|
||||
builder.build().expect("should build logger")
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dht_persistence() {
|
||||
let log = get_logger(false);
|
||||
#[test]
|
||||
fn test_dht_persistence() {
|
||||
let log = get_logger(false);
|
||||
|
||||
let beacon_chain = BeaconChainHarness::builder(MinimalEthSpec)
|
||||
.default_spec()
|
||||
.deterministic_keypairs(8)
|
||||
.fresh_ephemeral_store()
|
||||
.build()
|
||||
.chain;
|
||||
let beacon_chain = BeaconChainHarness::builder(MinimalEthSpec)
|
||||
.default_spec()
|
||||
.deterministic_keypairs(8)
|
||||
.fresh_ephemeral_store()
|
||||
.build()
|
||||
.chain;
|
||||
|
||||
let store = beacon_chain.store.clone();
|
||||
let store = beacon_chain.store.clone();
|
||||
|
||||
let enr1 = Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap();
|
||||
let enr2 = Enr::from_str("enr:-IS4QJ2d11eu6dC7E7LoXeLMgMP3kom1u3SE8esFSWvaHoo0dP1jg8O3-nx9ht-EO3CmG7L6OkHcMmoIh00IYWB92QABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIB_c-jQMOXsbjWkbN-Oj99H57gfId5pfb4wa1qxwV4CIN1ZHCCIyk").unwrap();
|
||||
let enrs = vec![enr1, enr2];
|
||||
let enr1 = Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap();
|
||||
let enr2 = Enr::from_str("enr:-IS4QJ2d11eu6dC7E7LoXeLMgMP3kom1u3SE8esFSWvaHoo0dP1jg8O3-nx9ht-EO3CmG7L6OkHcMmoIh00IYWB92QABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIB_c-jQMOXsbjWkbN-Oj99H57gfId5pfb4wa1qxwV4CIN1ZHCCIyk").unwrap();
|
||||
let enrs = vec![enr1, enr2];
|
||||
|
||||
let runtime = Arc::new(Runtime::new().unwrap());
|
||||
let runtime = Arc::new(Runtime::new().unwrap());
|
||||
|
||||
let (signal, exit) = async_channel::bounded(1);
|
||||
let (signal, exit) = async_channel::bounded(1);
|
||||
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
||||
let executor =
|
||||
task_executor::TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx);
|
||||
|
||||
let mut config = NetworkConfig::default();
|
||||
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21212, 21212, 21213);
|
||||
config.discv5_config.table_filter = |_| true; // Do not ignore local IPs
|
||||
config.upnp_enabled = false;
|
||||
config.boot_nodes_enr = enrs.clone();
|
||||
let config = Arc::new(config);
|
||||
runtime.block_on(async move {
|
||||
// Create a new network service which implicitly gets dropped at the
|
||||
// end of the block.
|
||||
|
||||
let BeaconProcessorChannels {
|
||||
beacon_processor_tx,
|
||||
beacon_processor_rx: _beacon_processor_rx,
|
||||
work_reprocessing_tx,
|
||||
work_reprocessing_rx: _work_reprocessing_rx,
|
||||
} = <_>::default();
|
||||
|
||||
let _network_service = NetworkService::start(
|
||||
beacon_chain.clone(),
|
||||
config,
|
||||
executor,
|
||||
None,
|
||||
beacon_processor_tx,
|
||||
work_reprocessing_tx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
drop(signal);
|
||||
});
|
||||
|
||||
let raw_runtime = Arc::try_unwrap(runtime).unwrap();
|
||||
raw_runtime.shutdown_timeout(tokio::time::Duration::from_secs(300));
|
||||
|
||||
// Load the persisted dht from the store
|
||||
let persisted_enrs = load_dht(store);
|
||||
assert!(
|
||||
persisted_enrs.contains(&enrs[0]),
|
||||
"should have persisted the first ENR to store"
|
||||
);
|
||||
assert!(
|
||||
persisted_enrs.contains(&enrs[1]),
|
||||
"should have persisted the second ENR to store"
|
||||
);
|
||||
}
|
||||
|
||||
// Test removing topic weight on old topics when a fork happens.
|
||||
#[test]
|
||||
fn test_removing_topic_weight_on_old_topics() {
|
||||
let runtime = Arc::new(Runtime::new().unwrap());
|
||||
|
||||
// Capella spec
|
||||
let mut spec = MinimalEthSpec::default_spec();
|
||||
spec.altair_fork_epoch = Some(Epoch::new(0));
|
||||
spec.bellatrix_fork_epoch = Some(Epoch::new(0));
|
||||
spec.capella_fork_epoch = Some(Epoch::new(1));
|
||||
|
||||
// Build beacon chain.
|
||||
let beacon_chain = BeaconChainHarness::builder(MinimalEthSpec)
|
||||
.spec(spec.clone().into())
|
||||
.deterministic_keypairs(8)
|
||||
.fresh_ephemeral_store()
|
||||
.mock_execution_layer()
|
||||
.build()
|
||||
.chain;
|
||||
let (next_fork_name, _) = beacon_chain.duration_to_next_fork().expect("next fork");
|
||||
assert_eq!(next_fork_name, ForkName::Capella);
|
||||
|
||||
// Build network service.
|
||||
let (mut network_service, network_globals, _network_senders) = runtime.block_on(async {
|
||||
let (_, exit) = async_channel::bounded(1);
|
||||
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
||||
let executor = task_executor::TaskExecutor::new(
|
||||
Arc::downgrade(&runtime),
|
||||
exit,
|
||||
log.clone(),
|
||||
get_logger(false),
|
||||
shutdown_tx,
|
||||
);
|
||||
|
||||
let mut config = NetworkConfig::default();
|
||||
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21212, 21212, 21213);
|
||||
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21214, 21214, 21215);
|
||||
config.discv5_config.table_filter = |_| true; // Do not ignore local IPs
|
||||
config.upnp_enabled = false;
|
||||
config.boot_nodes_enr = enrs.clone();
|
||||
let config = Arc::new(config);
|
||||
runtime.block_on(async move {
|
||||
// Create a new network service which implicitly gets dropped at the
|
||||
// end of the block.
|
||||
|
||||
let BeaconProcessorChannels {
|
||||
beacon_processor_tx,
|
||||
beacon_processor_rx: _beacon_processor_rx,
|
||||
work_reprocessing_tx,
|
||||
work_reprocessing_rx: _work_reprocessing_rx,
|
||||
} = <_>::default();
|
||||
let beacon_processor_channels =
|
||||
BeaconProcessorChannels::new(&BeaconProcessorConfig::default());
|
||||
NetworkService::build(
|
||||
beacon_chain.clone(),
|
||||
config,
|
||||
executor.clone(),
|
||||
None,
|
||||
beacon_processor_channels.beacon_processor_tx,
|
||||
beacon_processor_channels.work_reprocessing_tx,
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let _network_service = NetworkService::start(
|
||||
beacon_chain.clone(),
|
||||
config,
|
||||
executor,
|
||||
None,
|
||||
beacon_processor_tx,
|
||||
work_reprocessing_tx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
drop(signal);
|
||||
});
|
||||
|
||||
let raw_runtime = Arc::try_unwrap(runtime).unwrap();
|
||||
raw_runtime.shutdown_timeout(tokio::time::Duration::from_secs(300));
|
||||
|
||||
// Load the persisted dht from the store
|
||||
let persisted_enrs = load_dht(store);
|
||||
assert!(
|
||||
persisted_enrs.contains(&enrs[0]),
|
||||
"should have persisted the first ENR to store"
|
||||
);
|
||||
assert!(
|
||||
persisted_enrs.contains(&enrs[1]),
|
||||
"should have persisted the second ENR to store"
|
||||
);
|
||||
}
|
||||
|
||||
// Test removing topic weight on old topics when a fork happens.
|
||||
#[test]
|
||||
fn test_removing_topic_weight_on_old_topics() {
|
||||
let runtime = Arc::new(Runtime::new().unwrap());
|
||||
|
||||
// Capella spec
|
||||
let mut spec = MinimalEthSpec::default_spec();
|
||||
spec.altair_fork_epoch = Some(Epoch::new(0));
|
||||
spec.bellatrix_fork_epoch = Some(Epoch::new(0));
|
||||
spec.capella_fork_epoch = Some(Epoch::new(1));
|
||||
|
||||
// Build beacon chain.
|
||||
let beacon_chain = BeaconChainHarness::builder(MinimalEthSpec)
|
||||
.spec(spec.clone().into())
|
||||
.deterministic_keypairs(8)
|
||||
.fresh_ephemeral_store()
|
||||
.mock_execution_layer()
|
||||
.build()
|
||||
.chain;
|
||||
let (next_fork_name, _) = beacon_chain.duration_to_next_fork().expect("next fork");
|
||||
assert_eq!(next_fork_name, ForkName::Capella);
|
||||
|
||||
// Build network service.
|
||||
let (mut network_service, network_globals, _network_senders) = runtime.block_on(async {
|
||||
let (_, exit) = async_channel::bounded(1);
|
||||
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
||||
let executor = task_executor::TaskExecutor::new(
|
||||
Arc::downgrade(&runtime),
|
||||
exit,
|
||||
get_logger(false),
|
||||
shutdown_tx,
|
||||
);
|
||||
|
||||
let mut config = NetworkConfig::default();
|
||||
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21214, 21214, 21215);
|
||||
config.discv5_config.table_filter = |_| true; // Do not ignore local IPs
|
||||
config.upnp_enabled = false;
|
||||
let config = Arc::new(config);
|
||||
|
||||
let beacon_processor_channels =
|
||||
BeaconProcessorChannels::new(&BeaconProcessorConfig::default());
|
||||
NetworkService::build(
|
||||
beacon_chain.clone(),
|
||||
config,
|
||||
executor.clone(),
|
||||
None,
|
||||
beacon_processor_channels.beacon_processor_tx,
|
||||
beacon_processor_channels.work_reprocessing_tx,
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
// Subscribe to the topics.
|
||||
runtime.block_on(async {
|
||||
while network_globals.gossipsub_subscriptions.read().len() < 2 {
|
||||
if let Some(msg) = network_service.subnet_service.next().await {
|
||||
network_service.on_subnet_service_msg(msg);
|
||||
}
|
||||
// Subscribe to the topics.
|
||||
runtime.block_on(async {
|
||||
while network_globals.gossipsub_subscriptions.read().len() < 2 {
|
||||
if let Some(msg) = network_service.subnet_service.next().await {
|
||||
network_service.on_subnet_service_msg(msg);
|
||||
}
|
||||
});
|
||||
|
||||
// Make sure the service is subscribed to the topics.
|
||||
let (old_topic1, old_topic2) = {
|
||||
let mut subnets = SubnetId::compute_attestation_subnets(
|
||||
network_globals.local_enr().node_id().raw(),
|
||||
&spec,
|
||||
)
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(2, subnets.len());
|
||||
|
||||
let old_fork_digest = beacon_chain.enr_fork_id().fork_digest;
|
||||
let old_topic1 = GossipTopic::new(
|
||||
GossipKind::Attestation(subnets.pop().unwrap()),
|
||||
GossipEncoding::SSZSnappy,
|
||||
old_fork_digest,
|
||||
);
|
||||
let old_topic2 = GossipTopic::new(
|
||||
GossipKind::Attestation(subnets.pop().unwrap()),
|
||||
GossipEncoding::SSZSnappy,
|
||||
old_fork_digest,
|
||||
);
|
||||
|
||||
(old_topic1, old_topic2)
|
||||
};
|
||||
let subscriptions = network_globals.gossipsub_subscriptions.read().clone();
|
||||
assert_eq!(2, subscriptions.len());
|
||||
assert!(subscriptions.contains(&old_topic1));
|
||||
assert!(subscriptions.contains(&old_topic2));
|
||||
let old_topic_params1 = network_service
|
||||
.get_topic_params(old_topic1.clone())
|
||||
.expect("topic score params");
|
||||
assert!(old_topic_params1.topic_weight > 0.0);
|
||||
let old_topic_params2 = network_service
|
||||
.get_topic_params(old_topic2.clone())
|
||||
.expect("topic score params");
|
||||
assert!(old_topic_params2.topic_weight > 0.0);
|
||||
|
||||
// Advance slot to the next fork
|
||||
for _ in 0..MinimalEthSpec::slots_per_epoch() {
|
||||
beacon_chain.slot_clock.advance_slot();
|
||||
}
|
||||
});
|
||||
|
||||
// Run `NetworkService::update_next_fork()`.
|
||||
runtime.block_on(async {
|
||||
network_service.update_next_fork();
|
||||
});
|
||||
// Make sure the service is subscribed to the topics.
|
||||
let (old_topic1, old_topic2) = {
|
||||
let mut subnets = SubnetId::compute_attestation_subnets(
|
||||
network_globals.local_enr().node_id().raw(),
|
||||
&spec,
|
||||
)
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(2, subnets.len());
|
||||
|
||||
// Check that topic_weight on the old topics has been zeroed.
|
||||
let old_topic_params1 = network_service
|
||||
.get_topic_params(old_topic1)
|
||||
.expect("topic score params");
|
||||
assert_eq!(0.0, old_topic_params1.topic_weight);
|
||||
let old_fork_digest = beacon_chain.enr_fork_id().fork_digest;
|
||||
let old_topic1 = GossipTopic::new(
|
||||
GossipKind::Attestation(subnets.pop().unwrap()),
|
||||
GossipEncoding::SSZSnappy,
|
||||
old_fork_digest,
|
||||
);
|
||||
let old_topic2 = GossipTopic::new(
|
||||
GossipKind::Attestation(subnets.pop().unwrap()),
|
||||
GossipEncoding::SSZSnappy,
|
||||
old_fork_digest,
|
||||
);
|
||||
|
||||
let old_topic_params2 = network_service
|
||||
.get_topic_params(old_topic2)
|
||||
.expect("topic score params");
|
||||
assert_eq!(0.0, old_topic_params2.topic_weight);
|
||||
(old_topic1, old_topic2)
|
||||
};
|
||||
let subscriptions = network_globals.gossipsub_subscriptions.read().clone();
|
||||
assert_eq!(2, subscriptions.len());
|
||||
assert!(subscriptions.contains(&old_topic1));
|
||||
assert!(subscriptions.contains(&old_topic2));
|
||||
let old_topic_params1 = network_service
|
||||
.get_topic_params(old_topic1.clone())
|
||||
.expect("topic score params");
|
||||
assert!(old_topic_params1.topic_weight > 0.0);
|
||||
let old_topic_params2 = network_service
|
||||
.get_topic_params(old_topic2.clone())
|
||||
.expect("topic score params");
|
||||
assert!(old_topic_params2.topic_weight > 0.0);
|
||||
|
||||
// Advance slot to the next fork
|
||||
for _ in 0..MinimalEthSpec::slots_per_epoch() {
|
||||
beacon_chain.slot_clock.advance_slot();
|
||||
}
|
||||
|
||||
// Run `NetworkService::update_next_fork()`.
|
||||
runtime.block_on(async {
|
||||
network_service.update_next_fork();
|
||||
});
|
||||
|
||||
// Check that topic_weight on the old topics has been zeroed.
|
||||
let old_topic_params1 = network_service
|
||||
.get_topic_params(old_topic1)
|
||||
.expect("topic score params");
|
||||
assert_eq!(0.0, old_topic_params1.topic_weight);
|
||||
|
||||
let old_topic_params2 = network_service
|
||||
.get_topic_params(old_topic2)
|
||||
.expect("topic score params");
|
||||
assert_eq!(0.0, old_topic_params2.topic_weight);
|
||||
}
|
||||
|
||||
@@ -86,7 +86,7 @@ pub struct SubnetService<T: BeaconChainTypes> {
|
||||
subscriptions: HashSetDelay<Subnet>,
|
||||
|
||||
/// Subscriptions that need to be executed in the future.
|
||||
scheduled_subscriptions: HashSetDelay<Subnet>,
|
||||
scheduled_subscriptions: HashSetDelay<ExactSubnet>,
|
||||
|
||||
/// A list of permanent subnets that this node is subscribed to.
|
||||
// TODO: Shift this to a dynamic bitfield
|
||||
@@ -484,8 +484,10 @@ impl<T: BeaconChainTypes> SubnetService<T> {
|
||||
self.subscribe_to_subnet_immediately(subnet, slot + 1)?;
|
||||
} else {
|
||||
// This is a future slot, schedule subscribing.
|
||||
// We need to include the slot to make the key unique to prevent overwriting the entry
|
||||
// for the same subnet.
|
||||
self.scheduled_subscriptions
|
||||
.insert_at(subnet, time_to_subscription_start);
|
||||
.insert_at(ExactSubnet { subnet, slot }, time_to_subscription_start);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -626,7 +628,8 @@ impl<T: BeaconChainTypes> Stream for SubnetService<T> {
|
||||
// Process scheduled subscriptions that might be ready, since those can extend a soon to
|
||||
// expire subscription.
|
||||
match self.scheduled_subscriptions.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(subnet))) => {
|
||||
Poll::Ready(Some(Ok(exact_subnet))) => {
|
||||
let ExactSubnet { subnet, .. } = exact_subnet;
|
||||
let current_slot = self.beacon_chain.slot_clock.now().unwrap_or_default();
|
||||
if let Err(e) = self.subscribe_to_subnet_immediately(subnet, current_slot + 1) {
|
||||
debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet, "err" => e);
|
||||
|
||||
@@ -500,12 +500,15 @@ mod test {
|
||||
// subscription config
|
||||
let committee_count = 1;
|
||||
|
||||
// Makes 2 validator subscriptions to the same subnet but at different slots.
|
||||
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
|
||||
// Makes 3 validator subscriptions to the same subnet but at different slots.
|
||||
// There should be just 1 unsubscription event for each of the later slots subscriptions
|
||||
// (subscription_slot2 and subscription_slot3).
|
||||
let subscription_slot1 = 0;
|
||||
let subscription_slot2 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4;
|
||||
let subscription_slot3 = subscription_slot2 * 2;
|
||||
let com1 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4;
|
||||
let com2 = 0;
|
||||
let com3 = CHAIN.chain.spec.attestation_subnet_count - com1;
|
||||
|
||||
// create the attestation service and subscriptions
|
||||
let mut subnet_service = get_subnet_service();
|
||||
@@ -532,6 +535,13 @@ mod test {
|
||||
true,
|
||||
);
|
||||
|
||||
let sub3 = get_subscription(
|
||||
com3,
|
||||
current_slot + Slot::new(subscription_slot3),
|
||||
committee_count,
|
||||
true,
|
||||
);
|
||||
|
||||
let subnet_id1 = SubnetId::compute_subnet::<MainnetEthSpec>(
|
||||
current_slot + Slot::new(subscription_slot1),
|
||||
com1,
|
||||
@@ -548,12 +558,23 @@ mod test {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let subnet_id3 = SubnetId::compute_subnet::<MainnetEthSpec>(
|
||||
current_slot + Slot::new(subscription_slot3),
|
||||
com3,
|
||||
committee_count,
|
||||
&subnet_service.beacon_chain.spec,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Assert that subscriptions are different but their subnet is the same
|
||||
assert_ne!(sub1, sub2);
|
||||
assert_ne!(sub1, sub3);
|
||||
assert_ne!(sub2, sub3);
|
||||
assert_eq!(subnet_id1, subnet_id2);
|
||||
assert_eq!(subnet_id1, subnet_id3);
|
||||
|
||||
// submit the subscriptions
|
||||
subnet_service.validator_subscriptions(vec![sub1, sub2].into_iter());
|
||||
subnet_service.validator_subscriptions(vec![sub1, sub2, sub3].into_iter());
|
||||
|
||||
// Unsubscription event should happen at the end of the slot.
|
||||
// We wait for 2 slots, to avoid timeout issues
|
||||
@@ -590,10 +611,36 @@ mod test {
|
||||
// If the permanent and short lived subnets are different, we should get an unsubscription event.
|
||||
if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
|
||||
assert_eq!(
|
||||
[expected_subscription, expected_unsubscription],
|
||||
[
|
||||
expected_subscription.clone(),
|
||||
expected_unsubscription.clone(),
|
||||
],
|
||||
second_subscribe_event[..]
|
||||
);
|
||||
}
|
||||
|
||||
let subscription_slot = current_slot + subscription_slot3 - 1;
|
||||
|
||||
let wait_slots = subnet_service
|
||||
.beacon_chain
|
||||
.slot_clock
|
||||
.duration_to_slot(subscription_slot)
|
||||
.unwrap()
|
||||
.as_millis() as u64
|
||||
/ SLOT_DURATION_MILLIS;
|
||||
|
||||
let no_events = dbg!(get_events(&mut subnet_service, None, wait_slots as u32).await);
|
||||
|
||||
assert_eq!(no_events, []);
|
||||
|
||||
let third_subscribe_event = get_events(&mut subnet_service, None, 2).await;
|
||||
|
||||
if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
|
||||
assert_eq!(
|
||||
[expected_subscription, expected_unsubscription],
|
||||
third_subscribe_event[..]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -171,7 +171,10 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
self.awaiting_parent.is_some()
|
||||
|| self.block_request_state.state.is_awaiting_event()
|
||||
|| match &self.component_requests {
|
||||
ComponentRequests::WaitingForBlock => true,
|
||||
// If components are waiting for the block request to complete, here we should
|
||||
// check if the`block_request_state.state.is_awaiting_event(). However we already
|
||||
// checked that above, so `WaitingForBlock => false` is equivalent.
|
||||
ComponentRequests::WaitingForBlock => false,
|
||||
ComponentRequests::ActiveBlobRequest(request, _) => {
|
||||
request.state.is_awaiting_event()
|
||||
}
|
||||
|
||||
@@ -362,6 +362,16 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
self.sampling.get_request_status(block_root, index)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn range_sync_state(&self) -> super::range_sync::SyncChainStatus {
|
||||
self.range_sync.state()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn update_execution_engine_state(&mut self, state: EngineState) {
|
||||
self.handle_new_execution_engine_state(state);
|
||||
}
|
||||
|
||||
fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
|
||||
self.network.network_globals()
|
||||
}
|
||||
|
||||
@@ -763,8 +763,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
let requester = CustodyRequester(id);
|
||||
let mut request = ActiveCustodyRequest::new(
|
||||
block_root,
|
||||
// TODO(das): req_id is duplicated here, also present in id
|
||||
CustodyId { requester, req_id },
|
||||
CustodyId { requester },
|
||||
&custody_indexes_to_fetch,
|
||||
self.log.clone(),
|
||||
);
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use types::Hash256;
|
||||
|
||||
/// Trait that helps maintain RangeSync's implementation split from the BeaconChain
|
||||
pub trait BlockStorage {
|
||||
fn is_block_known(&self, block_root: &Hash256) -> bool;
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> BlockStorage for BeaconChain<T> {
|
||||
fn is_block_known(&self, block_root: &Hash256) -> bool {
|
||||
self.block_is_known_to_fork_choice(block_root)
|
||||
}
|
||||
}
|
||||
@@ -3,12 +3,11 @@
|
||||
//! Each chain type is stored in it's own map. A variety of helper functions are given along with
|
||||
//! this struct to simplify the logic of the other layers of sync.
|
||||
|
||||
use super::block_storage::BlockStorage;
|
||||
use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain};
|
||||
use super::sync_type::RangeSyncType;
|
||||
use crate::metrics;
|
||||
use crate::sync::network_context::SyncNetworkContext;
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::PeerId;
|
||||
use lighthouse_network::SyncInfo;
|
||||
@@ -37,10 +36,13 @@ pub enum RangeSyncState {
|
||||
Idle,
|
||||
}
|
||||
|
||||
pub type SyncChainStatus =
|
||||
Result<Option<(RangeSyncType, Slot /* from */, Slot /* to */)>, &'static str>;
|
||||
|
||||
/// A collection of finalized and head chains currently being processed.
|
||||
pub struct ChainCollection<T: BeaconChainTypes, C> {
|
||||
pub struct ChainCollection<T: BeaconChainTypes> {
|
||||
/// The beacon chain for processing.
|
||||
beacon_chain: Arc<C>,
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
/// The set of finalized chains being synced.
|
||||
finalized_chains: FnvHashMap<ChainId, SyncingChain<T>>,
|
||||
/// The set of head chains being synced.
|
||||
@@ -51,8 +53,8 @@ pub struct ChainCollection<T: BeaconChainTypes, C> {
|
||||
log: slog::Logger,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
|
||||
pub fn new(beacon_chain: Arc<C>, log: slog::Logger) -> Self {
|
||||
impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: slog::Logger) -> Self {
|
||||
ChainCollection {
|
||||
beacon_chain,
|
||||
finalized_chains: FnvHashMap::default(),
|
||||
@@ -213,9 +215,7 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn state(
|
||||
&self,
|
||||
) -> Result<Option<(RangeSyncType, Slot /* from */, Slot /* to */)>, &'static str> {
|
||||
pub fn state(&self) -> SyncChainStatus {
|
||||
match self.state {
|
||||
RangeSyncState::Finalized(ref syncing_id) => {
|
||||
let chain = self
|
||||
@@ -409,7 +409,8 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
|
||||
let log_ref = &self.log;
|
||||
|
||||
let is_outdated = |target_slot: &Slot, target_root: &Hash256| {
|
||||
target_slot <= &local_finalized_slot || beacon_chain.is_block_known(target_root)
|
||||
target_slot <= &local_finalized_slot
|
||||
|| beacon_chain.block_is_known_to_fork_choice(target_root)
|
||||
};
|
||||
|
||||
// Retain only head peers that remain relevant
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
//! peers.
|
||||
|
||||
mod batch;
|
||||
mod block_storage;
|
||||
mod chain;
|
||||
mod chain_collection;
|
||||
mod range;
|
||||
@@ -13,5 +12,7 @@ pub use batch::{
|
||||
ByRangeRequestType,
|
||||
};
|
||||
pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH};
|
||||
#[cfg(test)]
|
||||
pub use chain_collection::SyncChainStatus;
|
||||
pub use range::RangeSync;
|
||||
pub use sync_type::RangeSyncType;
|
||||
|
||||
@@ -39,9 +39,8 @@
|
||||
//! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially
|
||||
//! and further batches are requested as current blocks are being processed.
|
||||
|
||||
use super::block_storage::BlockStorage;
|
||||
use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain};
|
||||
use super::chain_collection::ChainCollection;
|
||||
use super::chain_collection::{ChainCollection, SyncChainStatus};
|
||||
use super::sync_type::RangeSyncType;
|
||||
use crate::metrics;
|
||||
use crate::status::ToStatusMessage;
|
||||
@@ -56,7 +55,7 @@ use lru_cache::LRUTimeCache;
|
||||
use slog::{crit, debug, trace, warn};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use types::{Epoch, EthSpec, Hash256, Slot};
|
||||
use types::{Epoch, EthSpec, Hash256};
|
||||
|
||||
/// For how long we store failed finalized chains to prevent retries.
|
||||
const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30;
|
||||
@@ -64,27 +63,26 @@ const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30;
|
||||
/// The primary object dealing with long range/batch syncing. This contains all the active and
|
||||
/// non-active chains that need to be processed before the syncing is considered complete. This
|
||||
/// holds the current state of the long range sync.
|
||||
pub struct RangeSync<T: BeaconChainTypes, C = BeaconChain<T>> {
|
||||
pub struct RangeSync<T: BeaconChainTypes> {
|
||||
/// The beacon chain for processing.
|
||||
beacon_chain: Arc<C>,
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
/// Last known sync info of our useful connected peers. We use this information to create Head
|
||||
/// chains after all finalized chains have ended.
|
||||
awaiting_head_peers: HashMap<PeerId, SyncInfo>,
|
||||
/// A collection of chains that need to be downloaded. This stores any head or finalized chains
|
||||
/// that need to be downloaded.
|
||||
chains: ChainCollection<T, C>,
|
||||
chains: ChainCollection<T>,
|
||||
/// Chains that have failed and are stored to prevent being retried.
|
||||
failed_chains: LRUTimeCache<Hash256>,
|
||||
/// The syncing logger.
|
||||
log: slog::Logger,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes, C> RangeSync<T, C>
|
||||
impl<T: BeaconChainTypes> RangeSync<T>
|
||||
where
|
||||
C: BlockStorage + ToStatusMessage,
|
||||
T: BeaconChainTypes,
|
||||
{
|
||||
pub fn new(beacon_chain: Arc<C>, log: slog::Logger) -> Self {
|
||||
pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: slog::Logger) -> Self {
|
||||
RangeSync {
|
||||
beacon_chain: beacon_chain.clone(),
|
||||
chains: ChainCollection::new(beacon_chain, log.clone()),
|
||||
@@ -96,9 +94,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub fn state(
|
||||
&self,
|
||||
) -> Result<Option<(RangeSyncType, Slot /* from */, Slot /* to */)>, &'static str> {
|
||||
pub fn state(&self) -> SyncChainStatus {
|
||||
self.chains.state()
|
||||
}
|
||||
|
||||
@@ -382,465 +378,3 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::network_beacon_processor::NetworkBeaconProcessor;
|
||||
use crate::sync::SyncMessage;
|
||||
use crate::NetworkMessage;
|
||||
|
||||
use super::*;
|
||||
use crate::sync::network_context::{BlockOrBlob, RangeRequestId};
|
||||
use beacon_chain::builder::Witness;
|
||||
use beacon_chain::eth1_chain::CachingEth1Backend;
|
||||
use beacon_chain::parking_lot::RwLock;
|
||||
use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType};
|
||||
use beacon_chain::EngineState;
|
||||
use beacon_processor::WorkEvent as BeaconWorkEvent;
|
||||
use lighthouse_network::service::api_types::SyncRequestId;
|
||||
use lighthouse_network::{
|
||||
rpc::StatusMessage, service::api_types::AppRequestId, NetworkConfig, NetworkGlobals,
|
||||
};
|
||||
use slog::{o, Drain};
|
||||
use slot_clock::TestingSlotClock;
|
||||
use std::collections::HashSet;
|
||||
use store::MemoryStore;
|
||||
use tokio::sync::mpsc;
|
||||
use types::{FixedBytesExtended, ForkName, MinimalEthSpec as E};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct FakeStorage {
|
||||
known_blocks: RwLock<HashSet<Hash256>>,
|
||||
status: RwLock<StatusMessage>,
|
||||
}
|
||||
|
||||
impl Default for FakeStorage {
|
||||
fn default() -> Self {
|
||||
FakeStorage {
|
||||
known_blocks: RwLock::new(HashSet::new()),
|
||||
status: RwLock::new(StatusMessage {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::zero(),
|
||||
finalized_epoch: 0usize.into(),
|
||||
head_root: Hash256::zero(),
|
||||
head_slot: 0usize.into(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FakeStorage {
|
||||
fn remember_block(&self, block_root: Hash256) {
|
||||
self.known_blocks.write().insert(block_root);
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn forget_block(&self, block_root: &Hash256) {
|
||||
self.known_blocks.write().remove(block_root);
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockStorage for FakeStorage {
|
||||
fn is_block_known(&self, block_root: &store::Hash256) -> bool {
|
||||
self.known_blocks.read().contains(block_root)
|
||||
}
|
||||
}
|
||||
|
||||
impl ToStatusMessage for FakeStorage {
|
||||
fn status_message(&self) -> StatusMessage {
|
||||
self.status.read().clone()
|
||||
}
|
||||
}
|
||||
|
||||
type TestBeaconChainType =
|
||||
Witness<TestingSlotClock, CachingEth1Backend<E>, E, MemoryStore<E>, MemoryStore<E>>;
|
||||
|
||||
fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
|
||||
let decorator = slog_term::TermDecorator::new().build();
|
||||
let drain = slog_term::FullFormat::new(decorator).build().fuse();
|
||||
let drain = slog_async::Async::new(drain).build().fuse();
|
||||
|
||||
if enabled {
|
||||
slog::Logger::root(drain.filter_level(level).fuse(), o!())
|
||||
} else {
|
||||
slog::Logger::root(drain.filter(|_| false).fuse(), o!())
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
struct TestRig {
|
||||
log: slog::Logger,
|
||||
/// To check what does sync send to the beacon processor.
|
||||
beacon_processor_rx: mpsc::Receiver<BeaconWorkEvent<E>>,
|
||||
/// To set up different scenarios where sync is told about known/unknown blocks.
|
||||
chain: Arc<FakeStorage>,
|
||||
/// Needed by range to handle communication with the network.
|
||||
cx: SyncNetworkContext<TestBeaconChainType>,
|
||||
/// To check what the network receives from Range.
|
||||
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
|
||||
/// To modify what the network declares about various global variables, in particular about
|
||||
/// the sync state of a peer.
|
||||
globals: Arc<NetworkGlobals<E>>,
|
||||
}
|
||||
|
||||
impl RangeSync<TestBeaconChainType, FakeStorage> {
|
||||
fn assert_state(&self, expected_state: RangeSyncType) {
|
||||
assert_eq!(
|
||||
self.state()
|
||||
.expect("State is ok")
|
||||
.expect("Range is syncing")
|
||||
.0,
|
||||
expected_state
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn assert_not_syncing(&self) {
|
||||
assert!(
|
||||
self.state().expect("State is ok").is_none(),
|
||||
"Range should not be syncing."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl TestRig {
|
||||
fn local_info(&self) -> SyncInfo {
|
||||
let StatusMessage {
|
||||
fork_digest: _,
|
||||
finalized_root,
|
||||
finalized_epoch,
|
||||
head_root,
|
||||
head_slot,
|
||||
} = self.chain.status.read().clone();
|
||||
SyncInfo {
|
||||
head_slot,
|
||||
head_root,
|
||||
finalized_epoch,
|
||||
finalized_root,
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads an BlocksByRange request to a given peer from the network receiver channel.
|
||||
#[track_caller]
|
||||
fn grab_request(
|
||||
&mut self,
|
||||
expected_peer: &PeerId,
|
||||
fork_name: ForkName,
|
||||
) -> (AppRequestId, Option<AppRequestId>) {
|
||||
let block_req_id = if let Ok(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: _,
|
||||
request_id,
|
||||
}) = self.network_rx.try_recv()
|
||||
{
|
||||
assert_eq!(&peer_id, expected_peer);
|
||||
request_id
|
||||
} else {
|
||||
panic!("Should have sent a batch request to the peer")
|
||||
};
|
||||
let blob_req_id = if fork_name.deneb_enabled() {
|
||||
if let Ok(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: _,
|
||||
request_id,
|
||||
}) = self.network_rx.try_recv()
|
||||
{
|
||||
assert_eq!(&peer_id, expected_peer);
|
||||
Some(request_id)
|
||||
} else {
|
||||
panic!("Should have sent a batch request to the peer")
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
(block_req_id, blob_req_id)
|
||||
}
|
||||
|
||||
fn complete_range_block_and_blobs_response(
|
||||
&mut self,
|
||||
block_req: AppRequestId,
|
||||
blob_req_opt: Option<AppRequestId>,
|
||||
) -> (ChainId, BatchId, Id) {
|
||||
if blob_req_opt.is_some() {
|
||||
match block_req {
|
||||
AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }) => {
|
||||
let _ = self
|
||||
.cx
|
||||
.range_block_and_blob_response(id, BlockOrBlob::Block(None));
|
||||
let response = self
|
||||
.cx
|
||||
.range_block_and_blob_response(id, BlockOrBlob::Blob(None))
|
||||
.unwrap();
|
||||
let (chain_id, batch_id) =
|
||||
TestRig::unwrap_range_request_id(response.sender_id);
|
||||
(chain_id, batch_id, id)
|
||||
}
|
||||
other => panic!("unexpected request {:?}", other),
|
||||
}
|
||||
} else {
|
||||
match block_req {
|
||||
AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }) => {
|
||||
let response = self
|
||||
.cx
|
||||
.range_block_and_blob_response(id, BlockOrBlob::Block(None))
|
||||
.unwrap();
|
||||
let (chain_id, batch_id) =
|
||||
TestRig::unwrap_range_request_id(response.sender_id);
|
||||
(chain_id, batch_id, id)
|
||||
}
|
||||
other => panic!("unexpected request {:?}", other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn unwrap_range_request_id(sender_id: RangeRequestId) -> (ChainId, BatchId) {
|
||||
if let RangeRequestId::RangeSync { chain_id, batch_id } = sender_id {
|
||||
(chain_id, batch_id)
|
||||
} else {
|
||||
panic!("expected RangeSync request: {:?}", sender_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Produce a head peer
|
||||
fn head_peer(
|
||||
&self,
|
||||
) -> (
|
||||
PeerId,
|
||||
SyncInfo, /* Local info */
|
||||
SyncInfo, /* Remote info */
|
||||
) {
|
||||
let local_info = self.local_info();
|
||||
|
||||
// Get a peer with an advanced head
|
||||
let head_root = Hash256::random();
|
||||
let head_slot = local_info.head_slot + 1;
|
||||
let remote_info = SyncInfo {
|
||||
head_root,
|
||||
head_slot,
|
||||
..local_info
|
||||
};
|
||||
let peer_id = PeerId::random();
|
||||
(peer_id, local_info, remote_info)
|
||||
}
|
||||
|
||||
fn finalized_peer(
|
||||
&self,
|
||||
) -> (
|
||||
PeerId,
|
||||
SyncInfo, /* Local info */
|
||||
SyncInfo, /* Remote info */
|
||||
) {
|
||||
let local_info = self.local_info();
|
||||
|
||||
let finalized_root = Hash256::random();
|
||||
let finalized_epoch = local_info.finalized_epoch + 2;
|
||||
let head_slot = finalized_epoch.start_slot(E::slots_per_epoch());
|
||||
let head_root = Hash256::random();
|
||||
let remote_info = SyncInfo {
|
||||
finalized_epoch,
|
||||
finalized_root,
|
||||
head_slot,
|
||||
head_root,
|
||||
};
|
||||
|
||||
let peer_id = PeerId::random();
|
||||
(peer_id, local_info, remote_info)
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn expect_empty_processor(&mut self) {
|
||||
match self.beacon_processor_rx.try_recv() {
|
||||
Ok(work) => {
|
||||
panic!(
|
||||
"Expected empty processor. Instead got {}",
|
||||
work.work_type_str()
|
||||
);
|
||||
}
|
||||
Err(e) => match e {
|
||||
mpsc::error::TryRecvError::Empty => {}
|
||||
mpsc::error::TryRecvError::Disconnected => unreachable!("bad coded test?"),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn expect_chain_segment(&mut self) {
|
||||
match self.beacon_processor_rx.try_recv() {
|
||||
Ok(work) => {
|
||||
assert_eq!(work.work_type(), beacon_processor::WorkType::ChainSegment);
|
||||
}
|
||||
other => panic!("Expected chain segment process, found {:?}", other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn range(log_enabled: bool) -> (TestRig, RangeSync<TestBeaconChainType, FakeStorage>) {
|
||||
let log = build_log(slog::Level::Trace, log_enabled);
|
||||
// Initialise a new beacon chain
|
||||
let harness = BeaconChainHarness::<EphemeralHarnessType<E>>::builder(E)
|
||||
.default_spec()
|
||||
.logger(log.clone())
|
||||
.deterministic_keypairs(1)
|
||||
.fresh_ephemeral_store()
|
||||
.build();
|
||||
let chain = harness.chain;
|
||||
|
||||
let fake_store = Arc::new(FakeStorage::default());
|
||||
let range_sync = RangeSync::<TestBeaconChainType, FakeStorage>::new(
|
||||
fake_store.clone(),
|
||||
log.new(o!("component" => "range")),
|
||||
);
|
||||
let (network_tx, network_rx) = mpsc::unbounded_channel();
|
||||
let (sync_tx, _sync_rx) = mpsc::unbounded_channel::<SyncMessage<E>>();
|
||||
let network_config = Arc::new(NetworkConfig::default());
|
||||
let globals = Arc::new(NetworkGlobals::new_test_globals(
|
||||
Vec::new(),
|
||||
&log,
|
||||
network_config,
|
||||
chain.spec.clone(),
|
||||
));
|
||||
let (network_beacon_processor, beacon_processor_rx) =
|
||||
NetworkBeaconProcessor::null_for_testing(
|
||||
globals.clone(),
|
||||
sync_tx,
|
||||
chain.clone(),
|
||||
harness.runtime.task_executor.clone(),
|
||||
log.clone(),
|
||||
);
|
||||
let cx = SyncNetworkContext::new(
|
||||
network_tx,
|
||||
Arc::new(network_beacon_processor),
|
||||
chain,
|
||||
log.new(o!("component" => "network_context")),
|
||||
);
|
||||
let test_rig = TestRig {
|
||||
log,
|
||||
beacon_processor_rx,
|
||||
chain: fake_store,
|
||||
cx,
|
||||
network_rx,
|
||||
globals,
|
||||
};
|
||||
(test_rig, range_sync)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn head_chain_removed_while_finalized_syncing() {
|
||||
// NOTE: this is a regression test.
|
||||
let (mut rig, mut range) = range(false);
|
||||
|
||||
// Get a peer with an advanced head
|
||||
let (head_peer, local_info, remote_info) = rig.head_peer();
|
||||
range.add_peer(&mut rig.cx, local_info, head_peer, remote_info);
|
||||
range.assert_state(RangeSyncType::Head);
|
||||
|
||||
let fork = rig
|
||||
.cx
|
||||
.chain
|
||||
.spec
|
||||
.fork_name_at_epoch(rig.cx.chain.epoch().unwrap());
|
||||
|
||||
// Sync should have requested a batch, grab the request.
|
||||
let _ = rig.grab_request(&head_peer, fork);
|
||||
|
||||
// Now get a peer with an advanced finalized epoch.
|
||||
let (finalized_peer, local_info, remote_info) = rig.finalized_peer();
|
||||
range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info);
|
||||
range.assert_state(RangeSyncType::Finalized);
|
||||
|
||||
// Sync should have requested a batch, grab the request
|
||||
let _ = rig.grab_request(&finalized_peer, fork);
|
||||
|
||||
// Fail the head chain by disconnecting the peer.
|
||||
range.remove_peer(&mut rig.cx, &head_peer);
|
||||
range.assert_state(RangeSyncType::Finalized);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn state_update_while_purging() {
|
||||
// NOTE: this is a regression test.
|
||||
let (mut rig, mut range) = range(true);
|
||||
|
||||
// Get a peer with an advanced head
|
||||
let (head_peer, local_info, head_info) = rig.head_peer();
|
||||
let head_peer_root = head_info.head_root;
|
||||
range.add_peer(&mut rig.cx, local_info, head_peer, head_info);
|
||||
range.assert_state(RangeSyncType::Head);
|
||||
|
||||
let fork = rig
|
||||
.cx
|
||||
.chain
|
||||
.spec
|
||||
.fork_name_at_epoch(rig.cx.chain.epoch().unwrap());
|
||||
|
||||
// Sync should have requested a batch, grab the request.
|
||||
let _ = rig.grab_request(&head_peer, fork);
|
||||
|
||||
// Now get a peer with an advanced finalized epoch.
|
||||
let (finalized_peer, local_info, remote_info) = rig.finalized_peer();
|
||||
let finalized_peer_root = remote_info.finalized_root;
|
||||
range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info);
|
||||
range.assert_state(RangeSyncType::Finalized);
|
||||
|
||||
// Sync should have requested a batch, grab the request
|
||||
let _ = rig.grab_request(&finalized_peer, fork);
|
||||
|
||||
// Now the chain knows both chains target roots.
|
||||
rig.chain.remember_block(head_peer_root);
|
||||
rig.chain.remember_block(finalized_peer_root);
|
||||
|
||||
// Add an additional peer to the second chain to make range update it's status
|
||||
let (finalized_peer, local_info, remote_info) = rig.finalized_peer();
|
||||
range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pause_and_resume_on_ee_offline() {
|
||||
let (mut rig, mut range) = range(true);
|
||||
let fork = rig
|
||||
.cx
|
||||
.chain
|
||||
.spec
|
||||
.fork_name_at_epoch(rig.cx.chain.epoch().unwrap());
|
||||
|
||||
// add some peers
|
||||
let (peer1, local_info, head_info) = rig.head_peer();
|
||||
range.add_peer(&mut rig.cx, local_info, peer1, head_info);
|
||||
let (block_req, blob_req_opt) = rig.grab_request(&peer1, fork);
|
||||
|
||||
let (chain1, batch1, id1) =
|
||||
rig.complete_range_block_and_blobs_response(block_req, blob_req_opt);
|
||||
|
||||
// make the ee offline
|
||||
rig.cx.update_execution_engine_state(EngineState::Offline);
|
||||
|
||||
// send the response to the request
|
||||
range.blocks_by_range_response(&mut rig.cx, peer1, chain1, batch1, id1, vec![]);
|
||||
|
||||
// the beacon processor shouldn't have received any work
|
||||
rig.expect_empty_processor();
|
||||
|
||||
// while the ee is offline, more peers might arrive. Add a new finalized peer.
|
||||
let (peer2, local_info, finalized_info) = rig.finalized_peer();
|
||||
range.add_peer(&mut rig.cx, local_info, peer2, finalized_info);
|
||||
let (block_req, blob_req_opt) = rig.grab_request(&peer2, fork);
|
||||
|
||||
let (chain2, batch2, id2) =
|
||||
rig.complete_range_block_and_blobs_response(block_req, blob_req_opt);
|
||||
|
||||
// send the response to the request
|
||||
range.blocks_by_range_response(&mut rig.cx, peer2, chain2, batch2, id2, vec![]);
|
||||
|
||||
// the beacon processor shouldn't have received any work
|
||||
rig.expect_empty_processor();
|
||||
|
||||
// make the beacon processor available again.
|
||||
rig.cx.update_execution_engine_state(EngineState::Online);
|
||||
|
||||
// now resume range, we should have two processing requests in the beacon processor.
|
||||
range.resume(&mut rig.cx);
|
||||
|
||||
rig.expect_chain_segment();
|
||||
rig.expect_chain_segment();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
//! Contains logic about identifying which Sync to perform given PeerSyncInfo of ourselves and
|
||||
//! of a remote.
|
||||
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use lighthouse_network::SyncInfo;
|
||||
|
||||
use super::block_storage::BlockStorage;
|
||||
|
||||
/// The type of Range sync that should be done relative to our current state.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum RangeSyncType {
|
||||
@@ -17,8 +16,8 @@ pub enum RangeSyncType {
|
||||
impl RangeSyncType {
|
||||
/// Determines the type of sync given our local `PeerSyncInfo` and the remote's
|
||||
/// `PeerSyncInfo`.
|
||||
pub fn new<C: BlockStorage>(
|
||||
chain: &C,
|
||||
pub fn new<T: BeaconChainTypes>(
|
||||
chain: &BeaconChain<T>,
|
||||
local_info: &SyncInfo,
|
||||
remote_info: &SyncInfo,
|
||||
) -> RangeSyncType {
|
||||
@@ -29,7 +28,7 @@ impl RangeSyncType {
|
||||
// not seen the finalized hash before.
|
||||
|
||||
if remote_info.finalized_epoch > local_info.finalized_epoch
|
||||
&& !chain.is_block_known(&remote_info.finalized_root)
|
||||
&& !chain.block_is_known_to_fork_choice(&remote_info.finalized_root)
|
||||
{
|
||||
RangeSyncType::Finalized
|
||||
} else {
|
||||
|
||||
@@ -83,6 +83,7 @@ impl TestRig {
|
||||
.logger(log.clone())
|
||||
.deterministic_keypairs(1)
|
||||
.fresh_ephemeral_store()
|
||||
.mock_execution_layer()
|
||||
.testing_slot_clock(TestingSlotClock::new(
|
||||
Slot::new(0),
|
||||
Duration::from_secs(0),
|
||||
@@ -144,7 +145,7 @@ impl TestRig {
|
||||
}
|
||||
}
|
||||
|
||||
fn test_setup() -> Self {
|
||||
pub fn test_setup() -> Self {
|
||||
Self::test_setup_with_config(None)
|
||||
}
|
||||
|
||||
@@ -168,11 +169,11 @@ impl TestRig {
|
||||
}
|
||||
}
|
||||
|
||||
fn log(&self, msg: &str) {
|
||||
pub fn log(&self, msg: &str) {
|
||||
info!(self.log, "TEST_RIG"; "msg" => msg);
|
||||
}
|
||||
|
||||
fn after_deneb(&self) -> bool {
|
||||
pub fn after_deneb(&self) -> bool {
|
||||
matches!(self.fork_name, ForkName::Deneb | ForkName::Electra)
|
||||
}
|
||||
|
||||
@@ -238,7 +239,7 @@ impl TestRig {
|
||||
(parent, block, parent_root, block_root)
|
||||
}
|
||||
|
||||
fn send_sync_message(&mut self, sync_message: SyncMessage<E>) {
|
||||
pub fn send_sync_message(&mut self, sync_message: SyncMessage<E>) {
|
||||
self.sync_manager.handle_message(sync_message);
|
||||
}
|
||||
|
||||
@@ -369,7 +370,7 @@ impl TestRig {
|
||||
self.expect_empty_network();
|
||||
}
|
||||
|
||||
fn new_connected_peer(&mut self) -> PeerId {
|
||||
pub fn new_connected_peer(&mut self) -> PeerId {
|
||||
self.network_globals
|
||||
.peers
|
||||
.write()
|
||||
@@ -811,7 +812,7 @@ impl TestRig {
|
||||
}
|
||||
}
|
||||
|
||||
fn peer_disconnected(&mut self, peer_id: PeerId) {
|
||||
pub fn peer_disconnected(&mut self, peer_id: PeerId) {
|
||||
self.send_sync_message(SyncMessage::Disconnect(peer_id));
|
||||
}
|
||||
|
||||
@@ -827,7 +828,7 @@ impl TestRig {
|
||||
}
|
||||
}
|
||||
|
||||
fn pop_received_network_event<T, F: Fn(&NetworkMessage<E>) -> Option<T>>(
|
||||
pub fn pop_received_network_event<T, F: Fn(&NetworkMessage<E>) -> Option<T>>(
|
||||
&mut self,
|
||||
predicate_transform: F,
|
||||
) -> Result<T, String> {
|
||||
@@ -847,7 +848,7 @@ impl TestRig {
|
||||
}
|
||||
}
|
||||
|
||||
fn pop_received_processor_event<T, F: Fn(&WorkEvent<E>) -> Option<T>>(
|
||||
pub fn pop_received_processor_event<T, F: Fn(&WorkEvent<E>) -> Option<T>>(
|
||||
&mut self,
|
||||
predicate_transform: F,
|
||||
) -> Result<T, String> {
|
||||
@@ -871,6 +872,16 @@ impl TestRig {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn expect_empty_processor(&mut self) {
|
||||
self.drain_processor_rx();
|
||||
if !self.beacon_processor_rx_queue.is_empty() {
|
||||
panic!(
|
||||
"Expected processor to be empty, but has events: {:?}",
|
||||
self.beacon_processor_rx_queue
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn find_block_lookup_request(
|
||||
&mut self,
|
||||
for_block: Hash256,
|
||||
@@ -2173,7 +2184,8 @@ fn custody_lookup_happy_path() {
|
||||
mod deneb_only {
|
||||
use super::*;
|
||||
use beacon_chain::{
|
||||
block_verification_types::RpcBlock, data_availability_checker::AvailabilityCheckError,
|
||||
block_verification_types::{AsBlock, RpcBlock},
|
||||
data_availability_checker::AvailabilityCheckError,
|
||||
};
|
||||
use ssz_types::VariableList;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
@@ -1 +1,273 @@
|
||||
use super::*;
|
||||
use crate::status::ToStatusMessage;
|
||||
use crate::sync::manager::SLOT_IMPORT_TOLERANCE;
|
||||
use crate::sync::range_sync::RangeSyncType;
|
||||
use crate::sync::SyncMessage;
|
||||
use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy};
|
||||
use beacon_chain::EngineState;
|
||||
use lighthouse_network::rpc::{RequestType, StatusMessage};
|
||||
use lighthouse_network::service::api_types::{AppRequestId, Id, SyncRequestId};
|
||||
use lighthouse_network::{PeerId, SyncInfo};
|
||||
use std::time::Duration;
|
||||
use types::{EthSpec, Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot};
|
||||
|
||||
const D: Duration = Duration::new(0, 0);
|
||||
|
||||
impl TestRig {
|
||||
/// Produce a head peer with an advanced head
|
||||
fn add_head_peer(&mut self) -> PeerId {
|
||||
self.add_head_peer_with_root(Hash256::random())
|
||||
}
|
||||
|
||||
/// Produce a head peer with an advanced head
|
||||
fn add_head_peer_with_root(&mut self, head_root: Hash256) -> PeerId {
|
||||
let local_info = self.local_info();
|
||||
self.add_peer(SyncInfo {
|
||||
head_root,
|
||||
head_slot: local_info.head_slot + 1 + Slot::new(SLOT_IMPORT_TOLERANCE as u64),
|
||||
..local_info
|
||||
})
|
||||
}
|
||||
|
||||
// Produce a finalized peer with an advanced finalized epoch
|
||||
fn add_finalized_peer(&mut self) -> PeerId {
|
||||
self.add_finalized_peer_with_root(Hash256::random())
|
||||
}
|
||||
|
||||
// Produce a finalized peer with an advanced finalized epoch
|
||||
fn add_finalized_peer_with_root(&mut self, finalized_root: Hash256) -> PeerId {
|
||||
let local_info = self.local_info();
|
||||
let finalized_epoch = local_info.finalized_epoch + 2;
|
||||
self.add_peer(SyncInfo {
|
||||
finalized_epoch,
|
||||
finalized_root,
|
||||
head_slot: finalized_epoch.start_slot(E::slots_per_epoch()),
|
||||
head_root: Hash256::random(),
|
||||
})
|
||||
}
|
||||
|
||||
fn local_info(&self) -> SyncInfo {
|
||||
let StatusMessage {
|
||||
fork_digest: _,
|
||||
finalized_root,
|
||||
finalized_epoch,
|
||||
head_root,
|
||||
head_slot,
|
||||
} = self.harness.chain.status_message();
|
||||
SyncInfo {
|
||||
head_slot,
|
||||
head_root,
|
||||
finalized_epoch,
|
||||
finalized_root,
|
||||
}
|
||||
}
|
||||
|
||||
fn add_peer(&mut self, remote_info: SyncInfo) -> PeerId {
|
||||
// Create valid peer known to network globals
|
||||
let peer_id = self.new_connected_peer();
|
||||
// Send peer to sync
|
||||
self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info.clone()));
|
||||
peer_id
|
||||
}
|
||||
|
||||
fn assert_state(&self, state: RangeSyncType) {
|
||||
assert_eq!(
|
||||
self.sync_manager
|
||||
.range_sync_state()
|
||||
.expect("State is ok")
|
||||
.expect("Range should be syncing")
|
||||
.0,
|
||||
state,
|
||||
"not expected range sync state"
|
||||
);
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn expect_chain_segment(&mut self) {
|
||||
self.pop_received_processor_event(|ev| {
|
||||
(ev.work_type() == beacon_processor::WorkType::ChainSegment).then_some(())
|
||||
})
|
||||
.unwrap_or_else(|e| panic!("Expect ChainSegment work event: {e:?}"));
|
||||
}
|
||||
|
||||
fn update_execution_engine_state(&mut self, state: EngineState) {
|
||||
self.log(&format!("execution engine state updated: {state:?}"));
|
||||
self.sync_manager.update_execution_engine_state(state);
|
||||
}
|
||||
|
||||
fn find_blocks_by_range_request(&mut self, target_peer_id: &PeerId) -> (Id, Option<Id>) {
|
||||
let block_req_id = self
|
||||
.pop_received_network_event(|ev| match ev {
|
||||
NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: RequestType::BlocksByRange(_),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }),
|
||||
} if peer_id == target_peer_id => Some(*id),
|
||||
_ => None,
|
||||
})
|
||||
.expect("Should have a blocks by range request");
|
||||
|
||||
let blob_req_id = if self.after_deneb() {
|
||||
Some(
|
||||
self.pop_received_network_event(|ev| match ev {
|
||||
NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: RequestType::BlobsByRange(_),
|
||||
request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }),
|
||||
} if peer_id == target_peer_id => Some(*id),
|
||||
_ => None,
|
||||
})
|
||||
.expect("Should have a blobs by range request"),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
(block_req_id, blob_req_id)
|
||||
}
|
||||
|
||||
fn find_and_complete_blocks_by_range_request(&mut self, target_peer_id: PeerId) {
|
||||
let (blocks_req_id, blobs_req_id) = self.find_blocks_by_range_request(&target_peer_id);
|
||||
|
||||
// Complete the request with a single stream termination
|
||||
self.log(&format!(
|
||||
"Completing BlocksByRange request {blocks_req_id} with empty stream"
|
||||
));
|
||||
self.send_sync_message(SyncMessage::RpcBlock {
|
||||
request_id: SyncRequestId::RangeBlockAndBlobs { id: blocks_req_id },
|
||||
peer_id: target_peer_id,
|
||||
beacon_block: None,
|
||||
seen_timestamp: D,
|
||||
});
|
||||
|
||||
if let Some(blobs_req_id) = blobs_req_id {
|
||||
// Complete the request with a single stream termination
|
||||
self.log(&format!(
|
||||
"Completing BlobsByRange request {blobs_req_id} with empty stream"
|
||||
));
|
||||
self.send_sync_message(SyncMessage::RpcBlob {
|
||||
request_id: SyncRequestId::RangeBlockAndBlobs { id: blobs_req_id },
|
||||
peer_id: target_peer_id,
|
||||
blob_sidecar: None,
|
||||
seen_timestamp: D,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_canonical_block(&mut self) -> SignedBeaconBlock<E> {
|
||||
self.harness.advance_slot();
|
||||
|
||||
let block_root = self
|
||||
.harness
|
||||
.extend_chain(
|
||||
1,
|
||||
BlockStrategy::OnCanonicalHead,
|
||||
AttestationStrategy::AllValidators,
|
||||
)
|
||||
.await;
|
||||
self.harness
|
||||
.chain
|
||||
.store
|
||||
.get_full_block(&block_root)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn remember_block(&mut self, block: SignedBeaconBlock<E>) {
|
||||
self.harness
|
||||
.process_block(block.slot(), block.canonical_root(), (block.into(), None))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn head_chain_removed_while_finalized_syncing() {
|
||||
// NOTE: this is a regression test.
|
||||
// Added in PR https://github.com/sigp/lighthouse/pull/2821
|
||||
let mut rig = TestRig::test_setup();
|
||||
|
||||
// Get a peer with an advanced head
|
||||
let head_peer = rig.add_head_peer();
|
||||
rig.assert_state(RangeSyncType::Head);
|
||||
|
||||
// Sync should have requested a batch, grab the request.
|
||||
let _ = rig.find_blocks_by_range_request(&head_peer);
|
||||
|
||||
// Now get a peer with an advanced finalized epoch.
|
||||
let finalized_peer = rig.add_finalized_peer();
|
||||
rig.assert_state(RangeSyncType::Finalized);
|
||||
|
||||
// Sync should have requested a batch, grab the request
|
||||
let _ = rig.find_blocks_by_range_request(&finalized_peer);
|
||||
|
||||
// Fail the head chain by disconnecting the peer.
|
||||
rig.peer_disconnected(head_peer);
|
||||
rig.assert_state(RangeSyncType::Finalized);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn state_update_while_purging() {
|
||||
// NOTE: this is a regression test.
|
||||
// Added in PR https://github.com/sigp/lighthouse/pull/2827
|
||||
let mut rig = TestRig::test_setup();
|
||||
|
||||
// Create blocks on a separate harness
|
||||
let mut rig_2 = TestRig::test_setup();
|
||||
// Need to create blocks that can be inserted into the fork-choice and fit the "known
|
||||
// conditions" below.
|
||||
let head_peer_block = rig_2.create_canonical_block().await;
|
||||
let head_peer_root = head_peer_block.canonical_root();
|
||||
let finalized_peer_block = rig_2.create_canonical_block().await;
|
||||
let finalized_peer_root = finalized_peer_block.canonical_root();
|
||||
|
||||
// Get a peer with an advanced head
|
||||
let head_peer = rig.add_head_peer_with_root(head_peer_root);
|
||||
rig.assert_state(RangeSyncType::Head);
|
||||
|
||||
// Sync should have requested a batch, grab the request.
|
||||
let _ = rig.find_blocks_by_range_request(&head_peer);
|
||||
|
||||
// Now get a peer with an advanced finalized epoch.
|
||||
let finalized_peer = rig.add_finalized_peer_with_root(finalized_peer_root);
|
||||
rig.assert_state(RangeSyncType::Finalized);
|
||||
|
||||
// Sync should have requested a batch, grab the request
|
||||
let _ = rig.find_blocks_by_range_request(&finalized_peer);
|
||||
|
||||
// Now the chain knows both chains target roots.
|
||||
rig.remember_block(head_peer_block).await;
|
||||
rig.remember_block(finalized_peer_block).await;
|
||||
|
||||
// Add an additional peer to the second chain to make range update it's status
|
||||
rig.add_finalized_peer();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pause_and_resume_on_ee_offline() {
|
||||
let mut rig = TestRig::test_setup();
|
||||
|
||||
// add some peers
|
||||
let peer1 = rig.add_head_peer();
|
||||
// make the ee offline
|
||||
rig.update_execution_engine_state(EngineState::Offline);
|
||||
// send the response to the request
|
||||
rig.find_and_complete_blocks_by_range_request(peer1);
|
||||
// the beacon processor shouldn't have received any work
|
||||
rig.expect_empty_processor();
|
||||
|
||||
// while the ee is offline, more peers might arrive. Add a new finalized peer.
|
||||
let peer2 = rig.add_finalized_peer();
|
||||
|
||||
// send the response to the request
|
||||
rig.find_and_complete_blocks_by_range_request(peer2);
|
||||
// the beacon processor shouldn't have received any work
|
||||
rig.expect_empty_processor();
|
||||
// make the beacon processor available again.
|
||||
// update_execution_engine_state implicitly calls resume
|
||||
// now resume range, we should have two processing requests in the beacon processor.
|
||||
rig.update_execution_engine_state(EngineState::Online);
|
||||
|
||||
rig.expect_chain_segment();
|
||||
rig.expect_chain_segment();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user