Merge branch 'unstable' into gloas-containers

This commit is contained in:
Eitan Seri-Levi
2025-12-03 17:06:10 -03:00
committed by GitHub
19 changed files with 413 additions and 362 deletions

View File

@@ -179,7 +179,7 @@ jobs:
continue-on-error: true
strategy:
matrix:
network: [sepolia, devnet]
network: [sepolia]
steps:
- uses: actions/checkout@v5

1
Cargo.lock generated
View File

@@ -8336,6 +8336,7 @@ dependencies = [
"reqwest",
"serde",
"task_executor",
"tracing",
"types",
"url",
"validator_metrics",

View File

@@ -1248,7 +1248,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let num_required_columns = T::EthSpec::number_of_columns() / 2;
let reconstruction_possible = columns.len() >= num_required_columns;
if reconstruction_possible {
reconstruct_blobs(&self.kzg, &columns, None, &block, &self.spec)
reconstruct_blobs(&self.kzg, columns, None, &block, &self.spec)
.map(Some)
.map_err(Error::FailedToReconstructBlobs)
} else {

View File

@@ -308,12 +308,14 @@ pub(crate) fn build_data_column_sidecars<E: EthSpec>(
/// and it will be slow if the node needs to reconstruct the blobs
pub fn reconstruct_blobs<E: EthSpec>(
kzg: &Kzg,
data_columns: &[Arc<DataColumnSidecar<E>>],
mut data_columns: Vec<Arc<DataColumnSidecar<E>>>,
blob_indices_opt: Option<Vec<u64>>,
signed_block: &SignedBlindedBeaconBlock<E>,
spec: &ChainSpec,
) -> Result<BlobSidecarList<E>, String> {
// The data columns are from the database, so we assume their correctness.
// Sort data columns by index to ensure ascending order for KZG operations
data_columns.sort_unstable_by_key(|dc| dc.index);
let first_data_column = data_columns
.first()
.ok_or("data_columns should have at least one element".to_string())?;
@@ -331,7 +333,7 @@ pub fn reconstruct_blobs<E: EthSpec>(
.map(|row_index| {
let mut cells: Vec<KzgCellRef> = vec![];
let mut cell_ids: Vec<u64> = vec![];
for data_column in data_columns {
for data_column in &data_columns {
let cell = data_column
.column
.get(row_index)
@@ -463,6 +465,7 @@ mod test {
test_reconstruct_data_columns(&kzg, &spec);
test_reconstruct_data_columns_unordered(&kzg, &spec);
test_reconstruct_blobs_from_data_columns(&kzg, &spec);
test_reconstruct_blobs_from_data_columns_unordered(&kzg, &spec);
test_validate_data_columns(&kzg, &spec);
}
@@ -595,7 +598,7 @@ mod test {
let blob_indices = vec![1, 2];
let reconstructed_blobs = reconstruct_blobs(
kzg,
&column_sidecars.iter().as_slice()[0..column_sidecars.len() / 2],
column_sidecars[0..column_sidecars.len() / 2].to_vec(),
Some(blob_indices.clone()),
&signed_blinded_block,
spec,
@@ -613,6 +616,31 @@ mod test {
}
}
#[track_caller]
fn test_reconstruct_blobs_from_data_columns_unordered(kzg: &Kzg, spec: &ChainSpec) {
let num_of_blobs = 2;
let (signed_block, blobs, proofs) =
create_test_fulu_block_and_blobs::<E>(num_of_blobs, spec);
let blob_refs = blobs.iter().collect::<Vec<_>>();
let column_sidecars =
blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec)
.unwrap();
// Test reconstruction with columns in reverse order (non-ascending)
let mut subset_columns: Vec<_> =
column_sidecars.iter().as_slice()[0..column_sidecars.len() / 2].to_vec();
subset_columns.reverse(); // This would fail without proper sorting in reconstruct_blobs
let signed_blinded_block = signed_block.into();
let reconstructed_blobs =
reconstruct_blobs(kzg, subset_columns, None, &signed_blinded_block, spec).unwrap();
for (i, original_blob) in blobs.iter().enumerate() {
let reconstructed_blob = &reconstructed_blobs.get(i).unwrap().blob;
assert_eq!(reconstructed_blob, original_blob, "{i}");
}
}
fn get_kzg() -> Kzg {
Kzg::new_from_trusted_setup(&get_trusted_setup()).expect("should create kzg")
}

View File

@@ -474,7 +474,7 @@ impl BlockId {
)
.collect::<Result<Vec<_>, _>>()?;
reconstruct_blobs(&chain.kzg, &data_columns, blob_indices, block, &chain.spec).map_err(
reconstruct_blobs(&chain.kzg, data_columns, blob_indices, block, &chain.spec).map_err(
|e| {
warp_utils::reject::custom_server_error(format!(
"Error reconstructing data columns: {e:?}"

View File

@@ -153,14 +153,13 @@ fn verify_checksum(bytes: &[u8], expected_checksum: &str) {
/// Returns the directory that will be used to store the deposit contract ABI.
fn abi_dir() -> PathBuf {
let base = env::var("CARGO_MANIFEST_DIR")
.expect("should know manifest dir")
let base = env::var("OUT_DIR")
.expect("should know out dir")
.parse::<PathBuf>()
.expect("should parse manifest dir as path")
.join("contracts");
.expect("should parse out dir as path");
std::fs::create_dir_all(base.clone())
.expect("should be able to create abi directory in manifest");
.expect("should be able to create abi directory in out dir");
base
}

View File

@@ -44,15 +44,25 @@ impl From<SszDecodeError> for Error {
pub const CONTRACT_DEPLOY_GAS: usize = 4_000_000;
pub const DEPOSIT_GAS: usize = 400_000;
pub const ABI: &[u8] = include_bytes!("../contracts/v0.12.1_validator_registration.json");
pub const BYTECODE: &[u8] = include_bytes!("../contracts/v0.12.1_validator_registration.bytecode");
pub const ABI: &[u8] = include_bytes!(concat!(
env!("OUT_DIR"),
"/v0.12.1_validator_registration.json"
));
pub const BYTECODE: &[u8] = include_bytes!(concat!(
env!("OUT_DIR"),
"/v0.12.1_validator_registration.bytecode"
));
pub const DEPOSIT_DATA_LEN: usize = 420; // lol
pub mod testnet {
pub const ABI: &[u8] =
include_bytes!("../contracts/v0.12.1_testnet_validator_registration.json");
pub const BYTECODE: &[u8] =
include_bytes!("../contracts/v0.12.1_testnet_validator_registration.bytecode");
pub const ABI: &[u8] = include_bytes!(concat!(
env!("OUT_DIR"),
"/v0.12.1_testnet_validator_registration.json"
));
pub const BYTECODE: &[u8] = include_bytes!(concat!(
env!("OUT_DIR"),
"/v0.12.1_testnet_validator_registration.bytecode"
));
}
pub fn encode_eth1_tx_data(deposit_data: &DepositData) -> Result<Vec<u8>, Error> {

View File

@@ -21,7 +21,7 @@ cd ./scripts/local_testnet
```
It will build a Lighthouse docker image from the root of the directory and will take an approximately 12 minutes to complete. Once built, the testing will be started automatically. You will see a list of services running and "Started!" at the end.
You can also select your own Lighthouse docker image to use by specifying it in `network_params.yml` under the `cl_image` key.
You can also select your own Lighthouse docker image to use by specifying it in `network_params.yaml` under the `cl_image` key.
Full configuration reference for Kurtosis is specified [here](https://github.com/ethpandaops/ethereum-package?tab=readme-ov-file#configuration).
To view all running services:

View File

@@ -1,19 +1,37 @@
# Full configuration reference [here](https://github.com/ethpandaops/ethereum-package?tab=readme-ov-file#configuration).
participants:
- el_type: geth
el_image: ethereum/client-go:latest
cl_type: lighthouse
- cl_type: lighthouse
cl_image: lighthouse:local
el_type: geth
el_image: ethereum/client-go:latest
supernode: true
cl_extra_params:
- --target-peers=3
count: 4
count: 2
- cl_type: lighthouse
cl_image: lighthouse:local
el_type: geth
el_image: ethereum/client-go:latest
supernode: false
cl_extra_params:
- --target-peers=3
count: 2
network_params:
electra_fork_epoch: 0
seconds_per_slot: 3
global_log_level: debug
fulu_fork_epoch: 0
seconds_per_slot: 6
snooper_enabled: false
global_log_level: debug
additional_services:
- dora
- spamoor
- prometheus_grafana
- tempo
spamoor_params:
image: ethpandaops/spamoor:master
spammers:
- scenario: eoatx
config:
throughput: 200
- scenario: blobs
config:
throughput: 20

View File

@@ -1,41 +0,0 @@
participants:
- cl_type: lighthouse
cl_image: lighthouse:local
el_type: geth
el_image: ethpandaops/geth:master
supernode: true
cl_extra_params:
# Note: useful for testing range sync (only produce block if the node is in sync to prevent forking)
- --sync-tolerance-epochs=0
- --target-peers=3
count: 2
- cl_type: lighthouse
cl_image: lighthouse:local
el_type: geth
el_image: ethpandaops/geth:master
supernode: false
cl_extra_params:
# Note: useful for testing range sync (only produce block if the node is in sync to prevent forking)
- --sync-tolerance-epochs=0
- --target-peers=3
count: 2
network_params:
electra_fork_epoch: 0
fulu_fork_epoch: 1
seconds_per_slot: 6
snooper_enabled: false
global_log_level: debug
additional_services:
- dora
- spamoor
- prometheus_grafana
- tempo
spamoor_params:
image: ethpandaops/spamoor:master
spammers:
- scenario: eoatx
config:
throughput: 200
- scenario: blobs
config:
throughput: 20

View File

@@ -78,6 +78,11 @@ if [ "$RUN_ASSERTOOR_TESTS" = true ]; then
echo "Assertoor has been added to $NETWORK_PARAMS_FILE."
fi
if [ "$KEEP_ENCLAVE" = false ]; then
# Stop local testnet
kurtosis enclave rm -f $ENCLAVE_NAME 2>/dev/null || true
fi
if [ "$BUILD_IMAGE" = true ]; then
echo "Building Lighthouse Docker image."
ROOT_DIR="$SCRIPT_DIR/../.."
@@ -86,11 +91,6 @@ else
echo "Not rebuilding Lighthouse Docker image."
fi
if [ "$KEEP_ENCLAVE" = false ]; then
# Stop local testnet
kurtosis enclave rm -f $ENCLAVE_NAME 2>/dev/null || true
fi
kurtosis run --enclave $ENCLAVE_NAME github.com/ethpandaops/ethereum-package@$ETHEREUM_PKG_VERSION --args-file $NETWORK_PARAMS_FILE
echo "Started!"

View File

@@ -1,24 +0,0 @@
# Kurtosis config file to checkpoint sync to a running devnet supported by ethPandaOps and `ethereum-package`.
participants:
- cl_type: lighthouse
cl_image: lighthouse:local
el_type: geth
el_image: ethpandaops/geth:master
cl_extra_params:
- --disable-backfill-rate-limiting
supernode: true
- cl_type: lighthouse
cl_image: lighthouse:local
el_type: geth
el_image: ethpandaops/geth:master
cl_extra_params:
- --disable-backfill-rate-limiting
supernode: false
checkpoint_sync_enabled: true
checkpoint_sync_url: "https://checkpoint-sync.fusaka-devnet-3.ethpandaops.io"
global_log_level: debug
network_params:
network: fusaka-devnet-3

View File

@@ -15,7 +15,7 @@ use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tracing::{error, info, warn};
use tracing::{error, info, instrument, warn};
use types::{
AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload,
ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, Fork, Graffiti, Hash256,
@@ -242,6 +242,7 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
/// Returns a `SigningMethod` for `validator_pubkey` *only if* that validator is considered safe
/// by doppelganger protection.
#[instrument(skip_all, level = "debug")]
fn doppelganger_checked_signing_method(
&self,
validator_pubkey: PublicKeyBytes,
@@ -745,6 +746,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
}
}
#[instrument(skip_all)]
async fn sign_attestation(
&self,
validator_pubkey: PublicKeyBytes,

View File

@@ -12,6 +12,7 @@ parking_lot = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
task_executor = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }
url = { workspace = true }
validator_metrics = { workspace = true }

View File

@@ -10,6 +10,7 @@ use reqwest::{Client, header::ACCEPT};
use std::path::PathBuf;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tracing::instrument;
use types::*;
use url::Url;
use web3signer::{ForkInfo, MessageType, SigningRequest, SigningResponse};
@@ -131,6 +132,7 @@ impl SigningMethod {
}
/// Return the signature of `signable_message`, with respect to the `signing_context`.
#[instrument(skip_all, level = "debug")]
pub async fn get_signature<E: EthSpec, Payload: AbstractExecPayload<E>>(
&self,
signable_message: SignableMessage<'_, E, Payload>,

View File

@@ -11,6 +11,7 @@ use rusqlite::{OptionalExtension, Transaction, TransactionBehavior, params};
use std::fs::File;
use std::path::Path;
use std::time::Duration;
use tracing::instrument;
use types::{AttestationData, BeaconBlockHeader, Epoch, Hash256, PublicKeyBytes, SignedRoot, Slot};
type Pool = r2d2::Pool<SqliteConnectionManager>;
@@ -639,6 +640,7 @@ impl SlashingDatabase {
/// to prevent concurrent checks and inserts from resulting in slashable data being inserted.
///
/// This is the safe, externally-callable interface for checking attestations.
#[instrument(skip_all, level = "debug")]
pub fn check_and_insert_attestation(
&self,
validator_pubkey: &PublicKeyBytes,

View File

@@ -8,7 +8,7 @@ use std::ops::Deref;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::time::{Duration, Instant, sleep, sleep_until};
use tracing::{debug, error, info, trace, warn};
use tracing::{Instrument, Span, debug, error, info, info_span, instrument, trace, warn};
use tree_hash::TreeHash;
use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot};
use validator_store::{Error as ValidatorStoreError, ValidatorStore};
@@ -180,8 +180,9 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
Ok(())
}
/// For each each required attestation, spawn a new task that downloads, signs and uploads the
/// attestation to the beacon node.
/// Spawn only one new task for attestation post-Electra
/// For each required aggregates, spawn a new task that downloads, signs and uploads the
/// aggregates to the beacon node.
fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
let duration_to_next_slot = self
@@ -189,6 +190,53 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.duration_to_next_slot()
.ok_or("Unable to determine duration to next slot")?;
// Create and publish an `Attestation` for all validators only once
// as the committee_index is not included in AttestationData post-Electra
let attestation_duties: Vec<_> = self.duties_service.attesters(slot).into_iter().collect();
let attestation_service = self.clone();
let attestation_data_handle = self
.inner
.executor
.spawn_handle(
async move {
let attestation_data = attestation_service
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, 0)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await
.map_err(|e| e.to_string())?;
attestation_service
.sign_and_publish_attestations(
slot,
&attestation_duties,
attestation_data.clone(),
)
.await
.map_err(|e| {
crit!(
error = format!("{:?}", e),
slot = slot.as_u64(),
"Error during attestation routine"
);
e
})?;
Ok::<AttestationData, String>(attestation_data)
},
"unaggregated attestation production",
)
.ok_or("Failed to spawn attestation data task")?;
// If a validator needs to publish an aggregate attestation, they must do so at 2/3
// through the slot. This delay triggers at this time
let aggregate_production_instant = Instant::now()
@@ -196,7 +244,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.checked_sub(slot_duration / 3)
.unwrap_or_else(|| Duration::from_secs(0));
let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
let aggregate_duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
.duties_service
.attesters(slot)
.into_iter()
@@ -207,24 +255,45 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
map
});
// Spawn a task that awaits the attestation data handle and then spawns aggregate tasks
let attestation_service_clone = self.clone();
let executor = self.inner.executor.clone();
self.inner.executor.spawn(
async move {
// Log an error if the handle fails and return, skipping aggregates
let attestation_data = match attestation_data_handle.await {
Ok(Some(Ok(data))) => data,
Ok(Some(Err(err))) => {
error!(?err, "Attestation production failed");
return;
}
Ok(None) | Err(_) => {
info!("Aborting attestation production due to shutdown");
return;
}
};
// For each committee index for this slot:
//
// - Create and publish an `Attestation` for all required validators.
// - Create and publish `SignedAggregateAndProof` for all aggregating validators.
duties_by_committee_index
.into_iter()
.for_each(|(committee_index, validator_duties)| {
// Spawn a separate task for each attestation.
self.inner.executor.spawn_ignoring_error(
self.clone().publish_attestations_and_aggregates(
// Create and publish `SignedAggregateAndProof` for all aggregating validators.
aggregate_duties_by_committee_index.into_iter().for_each(
|(committee_index, validator_duties)| {
let attestation_service = attestation_service_clone.clone();
let attestation_data = attestation_data.clone();
executor.spawn_ignoring_error(
attestation_service.handle_aggregates(
slot,
committee_index,
validator_duties,
aggregate_production_instant,
attestation_data,
),
"attestation publish",
"aggregate publish",
);
},
)
},
"attestation and aggregate publish",
);
});
// Schedule pruning of the slashing protection database once all unaggregated
// attestations have (hopefully) been signed, i.e. at the same time as aggregate
@@ -234,55 +303,26 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
Ok(())
}
/// Performs the first step of the attesting process: downloading `Attestation` objects,
/// signing them and returning them to the validator.
///
/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting
///
/// ## Detail
///
/// The given `validator_duties` should already be filtered to only contain those that match
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
async fn publish_attestations_and_aggregates(
#[instrument(
name = "handle_aggregates",
skip_all,
fields(%slot, %committee_index)
)]
async fn handle_aggregates(
self,
slot: Slot,
committee_index: CommitteeIndex,
validator_duties: Vec<DutyAndProof>,
aggregate_production_instant: Instant,
attestation_data: AttestationData,
) -> Result<(), ()> {
let attestations_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS],
);
// There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have
// There's not need to produce `SignedAggregateAndProof` if we do not have
// any validators for the given `slot` and `committee_index`.
if validator_duties.is_empty() {
return Ok(());
}
// Step 1.
//
// Download, sign and publish an `Attestation` for each validator.
let attestation_opt = self
.produce_and_publish_attestations(slot, committee_index, &validator_duties)
.await
.map_err(move |e| {
crit!(
error = format!("{:?}", e),
committee_index,
slot = slot.as_u64(),
"Error during attestation routine"
)
})?;
drop(attestations_timer);
// Step 2.
//
// If an attestation was produced, make an aggregate.
if let Some(attestation_data) = attestation_opt {
// First, wait until the `aggregation_production_instant` (2/3rds
// Wait until the `aggregation_production_instant` (2/3rds
// of the way though the slot). As verified in the
// `delay_triggers_when_in_the_past` test, this code will still run
// even if the instant has already elapsed.
@@ -294,48 +334,45 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
&[validator_metrics::AGGREGATES],
);
// Then download, sign and publish a `SignedAggregateAndProof` for each
// Download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committee_index`.
self.produce_and_publish_aggregates(
&attestation_data,
committee_index,
&validator_duties,
)
self.produce_and_publish_aggregates(&attestation_data, committee_index, &validator_duties)
.await
.map_err(move |e| {
crit!(
error = format!("{:?}", e),
committee_index,
slot = slot.as_u64(),
"Error during attestation routine"
"Error during aggregate attestation routine"
)
})?;
}
Ok(())
}
/// Performs the first step of the attesting process: downloading `Attestation` objects,
/// signing them and returning them to the validator.
/// Performs the main steps of the attesting process: signing and publishing to the BN.
///
/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting
/// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/validator.md#attesting
///
/// ## Detail
///
/// The given `validator_duties` should already be filtered to only contain those that match
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
///
/// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each
/// validator and the list of individually-signed `Attestation` objects is returned to the BN.
async fn produce_and_publish_attestations(
/// `slot`. Critical errors will be logged if this is not the case.
#[instrument(skip_all, fields(%slot, %attestation_data.beacon_block_root))]
async fn sign_and_publish_attestations(
&self,
slot: Slot,
committee_index: CommitteeIndex,
validator_duties: &[DutyAndProof],
) -> Result<Option<AttestationData>, String> {
attestation_data: AttestationData,
) -> Result<(), String> {
let _attestations_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS],
);
if validator_duties.is_empty() {
return Ok(None);
return Ok(());
}
let current_epoch = self
@@ -344,25 +381,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.ok_or("Unable to determine current slot from clock")?
.epoch(S::E::slots_per_epoch());
let attestation_data = self
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await
.map_err(|e| e.to_string())?;
// Create futures to produce signed `Attestation` objects.
let attestation_data_ref = &attestation_data;
let signing_futures = validator_duties.iter().map(|duty_and_proof| async move {
let signing_futures = validator_duties.iter().map(|duty_and_proof| {
async move {
let duty = &duty_and_proof.duty;
let attestation_data = attestation_data_ref;
@@ -418,7 +440,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
info = "a validator may have recently been removed from this VC",
pubkey = ?pubkey,
validator = ?duty.pubkey,
committee_index = committee_index,
slot = slot.as_u64(),
"Missing pubkey for attestation"
);
@@ -428,17 +449,22 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
crit!(
error = ?e,
validator = ?duty.pubkey,
committee_index,
slot = slot.as_u64(),
"Failed to sign attestation"
);
None
}
}
}
.instrument(Span::current())
});
// Execute all the futures in parallel, collecting any successful results.
let (ref attestations, ref validator_indices): (Vec<_>, Vec<_>) = join_all(signing_futures)
.instrument(info_span!(
"sign_attestations",
count = validator_duties.len()
))
.await
.into_iter()
.flatten()
@@ -446,7 +472,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
if attestations.is_empty() {
warn!("No attestations were published");
return Ok(None);
return Ok(());
}
let fork_name = self
.chain_spec
@@ -487,6 +513,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.post_beacon_pool_attestations_v2::<S::E>(single_attestations, fork_name)
.await
})
.instrument(info_span!(
"publish_attestations",
count = attestations.len()
))
.await
{
Ok(()) => info!(
@@ -507,7 +537,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
),
}
Ok(Some(attestation_data))
Ok(())
}
/// Performs the second step of the attesting process: downloading an aggregated `Attestation`,
@@ -523,6 +553,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
/// Only one aggregated `Attestation` is downloaded from the BN. It is then cloned and signed
/// by each validator and the list of individually-signed `SignedAggregateAndProof` objects is
/// returned to the BN.
#[instrument(skip_all, fields(slot = %attestation_data.slot, %committee_index))]
async fn produce_and_publish_aggregates(
&self,
attestation_data: &AttestationData,
@@ -575,6 +606,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.map(|result| result.data)
}
})
.instrument(info_span!("fetch_aggregate_attestation"))
.await
.map_err(|e| e.to_string())?;
@@ -617,7 +649,12 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
});
// Execute all the futures in parallel, collecting any successful results.
let aggregator_count = validator_duties
.iter()
.filter(|d| d.selection_proof.is_some())
.count();
let signed_aggregate_and_proofs = join_all(signing_futures)
.instrument(info_span!("sign_aggregates", count = aggregator_count))
.await
.into_iter()
.flatten()
@@ -647,6 +684,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.await
}
})
.instrument(info_span!(
"publish_aggregates",
count = signed_aggregate_and_proofs.len()
))
.await
{
Ok(()) => {

View File

@@ -1,5 +1,4 @@
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback, Error as FallbackError, Errors};
use bls::SignatureBytes;
use eth2::{BeaconNodeHttpClient, StatusCode};
use graffiti_file::{GraffitiFile, determine_graffiti};
use logging::crit;
@@ -11,7 +10,7 @@ use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace, warn};
use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn};
use types::{BlockType, ChainSpec, EthSpec, Graffiti, PublicKeyBytes, Slot};
use validator_store::{Error as ValidatorStoreError, SignedBlock, UnsignedBlock, ValidatorStore};
@@ -298,7 +297,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
self.inner.executor.spawn(
async move {
let result = service
.publish_block(slot, validator_pubkey, builder_boost_factor)
.get_validator_block_and_publish_block(slot, validator_pubkey, builder_boost_factor)
.await;
match result {
@@ -320,6 +319,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(%slot, ?validator_pubkey))]
async fn sign_and_publish_block(
&self,
proposer_fallback: ProposerFallback<T>,
@@ -333,6 +333,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
let res = self
.validator_store
.sign_block(*validator_pubkey, unsigned_block, slot)
.instrument(info_span!("sign_block"))
.await;
let signed_block = match res {
@@ -389,7 +390,12 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
Ok(())
}
async fn publish_block(
#[instrument(
name = "block_proposal_duty_cycle",
skip_all,
fields(%slot, ?validator_pubkey)
)]
async fn get_validator_block_and_publish_block(
self,
slot: Slot,
validator_pubkey: PublicKeyBytes,
@@ -442,22 +448,48 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
info!(slot = slot.as_u64(), "Requesting unsigned block");
// Request block from first responsive beacon node.
// Request an SSZ block from all beacon nodes in order, returning on the first successful response.
// If all nodes fail, run a second pass falling back to JSON.
//
// Try the proposer nodes last, since it's likely that they don't have a
// Proposer nodes will always be tried last during each pass since it's likely that they don't have a
// great view of attestations on the network.
let unsigned_block = proposer_fallback
let ssz_block_response = proposer_fallback
.request_proposers_last(|beacon_node| async move {
let _get_timer = validator_metrics::start_timer_vec(
&validator_metrics::BLOCK_SERVICE_TIMES,
&[validator_metrics::BEACON_BLOCK_HTTP_GET],
);
Self::get_validator_block(
&beacon_node,
beacon_node
.get_validator_blocks_v3_ssz::<S::E>(
slot,
randao_reveal_ref,
graffiti,
proposer_index,
graffiti.as_ref(),
builder_boost_factor,
)
.await
})
.await;
let block_response = match ssz_block_response {
Ok((ssz_block_response, _metadata)) => ssz_block_response,
Err(e) => {
warn!(
slot = slot.as_u64(),
error = %e,
"SSZ block production failed, falling back to JSON"
);
proposer_fallback
.request_proposers_last(|beacon_node| async move {
let _get_timer = validator_metrics::start_timer_vec(
&validator_metrics::BLOCK_SERVICE_TIMES,
&[validator_metrics::BEACON_BLOCK_HTTP_GET],
);
let (json_block_response, _metadata) = beacon_node
.get_validator_blocks_v3::<S::E>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
builder_boost_factor,
)
.await
@@ -466,9 +498,30 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
"Error from beacon node when producing block: {:?}",
e
))
})?;
Ok(json_block_response.data)
})
})
.await?;
.await
.map_err(BlockError::from)?
}
};
let (block_proposer, unsigned_block) = match block_response {
eth2::types::ProduceBlockV3Response::Full(block) => {
(block.block().proposer_index(), UnsignedBlock::Full(block))
}
eth2::types::ProduceBlockV3Response::Blinded(block) => {
(block.proposer_index(), UnsignedBlock::Blinded(block))
}
};
info!(slot = slot.as_u64(), "Received unsigned block");
if proposer_index != Some(block_proposer) {
return Err(BlockError::Recoverable(
"Proposer index does not match block proposer. Beacon chain re-orged".to_string(),
));
}
self_ref
.sign_and_publish_block(
@@ -483,6 +536,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
Ok(())
}
#[instrument(skip_all)]
async fn publish_signed_block_contents(
&self,
signed_block: &SignedBlock<S::E>,
@@ -517,70 +571,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
}
Ok::<_, BlockError>(())
}
async fn get_validator_block(
beacon_node: &BeaconNodeHttpClient,
slot: Slot,
randao_reveal_ref: &SignatureBytes,
graffiti: Option<Graffiti>,
proposer_index: Option<u64>,
builder_boost_factor: Option<u64>,
) -> Result<UnsignedBlock<S::E>, BlockError> {
let block_response = match beacon_node
.get_validator_blocks_v3_ssz::<S::E>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
builder_boost_factor,
)
.await
{
Ok((ssz_block_response, _)) => ssz_block_response,
Err(e) => {
warn!(
slot = slot.as_u64(),
error = %e,
"Beacon node does not support SSZ in block production, falling back to JSON"
);
let (json_block_response, _) = beacon_node
.get_validator_blocks_v3::<S::E>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
builder_boost_factor,
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?;
// Extract ProduceBlockV3Response (data field of the struct ForkVersionedResponse)
json_block_response.data
}
};
let (block_proposer, unsigned_block) = match block_response {
eth2::types::ProduceBlockV3Response::Full(block) => {
(block.block().proposer_index(), UnsignedBlock::Full(block))
}
eth2::types::ProduceBlockV3Response::Blinded(block) => {
(block.proposer_index(), UnsignedBlock::Blinded(block))
}
};
info!(slot = slot.as_u64(), "Received unsigned block");
if proposer_index != Some(block_proposer) {
return Err(BlockError::Recoverable(
"Proposer index does not match block proposer. Beacon chain re-orged".to_string(),
));
}
Ok::<_, BlockError>(unsigned_block)
}
}
/// Wrapper for values we want to log about a block we signed, for easy extraction from the possible

View File

@@ -11,7 +11,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use task_executor::TaskExecutor;
use tokio::time::{Duration, Instant, sleep, sleep_until};
use tracing::{debug, error, info, trace, warn};
use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn};
use types::{
ChainSpec, EthSpec, Hash256, PublicKeyBytes, Slot, SyncCommitteeSubscription,
SyncContributionData, SyncDuty, SyncSelectionProof, SyncSubnetId,
@@ -208,7 +208,8 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
.publish_sync_committee_signatures(slot, block_root, validator_duties)
.map(|_| ())
.await
},
}
.instrument(info_span!("sync_committee_signature_publish", %slot)),
"sync_committee_signature_publish",
);
@@ -225,7 +226,8 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
)
.map(|_| ())
.await
},
}
.instrument(info_span!("sync_committee_aggregate_publish", %slot)),
"sync_committee_aggregate_publish",
);
@@ -233,6 +235,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
}
/// Publish sync committee signatures.
#[instrument(skip_all, fields(%slot, ?beacon_block_root))]
async fn publish_sync_committee_signatures(
&self,
slot: Slot,
@@ -277,6 +280,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
// Execute all the futures in parallel, collecting any successful results.
let committee_signatures = &join_all(signature_futures)
.instrument(info_span!(
"sign_sync_signatures",
count = validator_duties.len()
))
.await
.into_iter()
.flatten()
@@ -288,6 +295,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
.post_beacon_pool_sync_committee_signatures(committee_signatures)
.await
})
.instrument(info_span!(
"publish_sync_signatures",
count = committee_signatures.len()
))
.await
.map_err(|e| {
error!(
@@ -328,7 +339,8 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
)
.map(|_| ())
.await
},
}
.instrument(info_span!("publish_sync_committee_aggregate_for_subnet", %slot, ?beacon_block_root, %subnet_id)),
"sync_committee_aggregate_publish_subnet",
);
}
@@ -357,6 +369,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
.get_validator_sync_committee_contribution(&sync_contribution_data)
.await
})
.instrument(info_span!("fetch_sync_contribution"))
.await
.map_err(|e| {
crit!(
@@ -372,6 +385,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
.data;
// Create futures to produce signed contributions.
let aggregator_count = subnet_aggregators.len();
let signature_futures = subnet_aggregators.into_iter().map(
|(aggregator_index, aggregator_pk, selection_proof)| async move {
match self
@@ -405,6 +419,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
// Execute all the futures in parallel, collecting any successful results.
let signed_contributions = &join_all(signature_futures)
.instrument(info_span!(
"sign_sync_contributions",
count = aggregator_count
))
.await
.into_iter()
.flatten()
@@ -417,6 +435,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
.post_validator_contribution_and_proofs(signed_contributions)
.await
})
.instrument(info_span!(
"publish_sync_contributions",
count = signed_contributions.len()
))
.await
.map_err(|e| {
error!(