blob production

This commit is contained in:
realbigsean
2022-10-05 17:14:45 -04:00
parent 91efb9d4c7
commit b5b4ce9509
17 changed files with 623 additions and 168 deletions

View File

@@ -6,13 +6,16 @@ use crate::{
};
use crate::{http_metrics::metrics, validator_store::ValidatorStore};
use environment::RuntimeContext;
use eth2::types::Graffiti;
use eth2::types::{Graffiti, VariableList};
use slog::{crit, debug, error, info, trace, warn};
use slot_clock::SlotClock;
use std::ops::Deref;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{BlindedPayload, BlockType, EthSpec, ExecPayload, FullPayload, PublicKeyBytes, Slot};
use types::{
BlindedPayload, BlobsSidecar, BlockType, EthSpec, ExecPayload, ForkName, FullPayload,
PublicKeyBytes, Slot,
};
#[derive(Debug)]
pub enum BlockError {
@@ -316,126 +319,285 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let proposer_index = self.validator_store.validator_index(&validator_pubkey);
let validator_pubkey_ref = &validator_pubkey;
// Request block from first responsive beacon node.
let block = self
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let block = match Payload::block_type() {
BlockType::Full => {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_GET],
);
beacon_node
.get_validator_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
BlockType::Blinded => {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BLINDED_BEACON_BLOCK_HTTP_GET],
);
beacon_node
.get_validator_blinded_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
};
match self.context.eth2_config.spec.fork_name_at_slot::<E>(slot) {
ForkName::Base | ForkName::Altair | ForkName::Merge => {
// Request block from first responsive beacon node.
let block = self
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let block = match Payload::block_type() {
BlockType::Full => {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_GET],
);
beacon_node
.get_validator_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
BlockType::Blinded => {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BLINDED_BEACON_BLOCK_HTTP_GET],
);
beacon_node
.get_validator_blinded_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
};
if proposer_index != Some(block.proposer_index()) {
return Err(BlockError::Recoverable(
"Proposer index does not match block proposer. Beacon chain re-orged"
.to_string(),
));
}
if proposer_index != Some(block.proposer_index()) {
return Err(BlockError::Recoverable(
"Proposer index does not match block proposer. Beacon chain re-orged"
.to_string(),
));
}
Ok::<_, BlockError>(block)
},
)
.await?;
Ok::<_, BlockError>(block)
},
)
.await?;
let signed_block = self_ref
.validator_store
.sign_block::<Payload>(*validator_pubkey_ref, block, current_slot)
.await
.map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?;
let signed_block = self_ref
.validator_store
.sign_block::<Payload>(*validator_pubkey_ref, block, current_slot)
.await
.map_err(|e| {
BlockError::Recoverable(format!("Unable to sign block: {:?}", e))
})?;
// Publish block with first available beacon node.
self.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async {
match Payload::block_type() {
BlockType::Full => {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blocks(&signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?
}
BlockType::Blinded => {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BLINDED_BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blinded_blocks(&signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?
}
}
Ok::<_, BlockError>(())
},
)
.await?;
info!(
log,
"Successfully published block";
"block_type" => ?Payload::block_type(),
"deposits" => signed_block.message().body().deposits().len(),
"attestations" => signed_block.message().body().attestations().len(),
"graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()),
"slot" => signed_block.slot().as_u64(),
);
}
ForkName::Eip4844 => {
if matches!(Payload::block_type(), BlockType::Blinded) {
//FIXME(sean)
crit!(
log,
"`--builder-payloads` not yet supported for EIP-4844 fork"
);
return Ok(());
}
// Request block from first responsive beacon node.
let block_and_blobs = self
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_GET],
);
let block_and_blobs = beacon_node
.get_validator_blocks_and_blobs::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data;
if proposer_index != Some(block_and_blobs.block.proposer_index()) {
return Err(BlockError::Recoverable(
"Proposer index does not match block proposer. Beacon chain re-orged"
.to_string(),
));
}
Ok::<_, BlockError>(block_and_blobs)
},
)
.await?;
let blobs_sidecar = BlobsSidecar {
beacon_block_root: block_and_blobs.block.canonical_root(),
beacon_block_slot: block_and_blobs.block.slot(),
blobs: VariableList::from(block_and_blobs.blobs),
kzg_aggregate_proof: block_and_blobs.kzg_aggregate_proof,
};
let block = block_and_blobs.block;
let block_publish_future = async {
let signed_block = self_ref
.validator_store
.sign_block::<Payload>(*validator_pubkey_ref, block, current_slot)
.await
.map_err(|e| {
BlockError::Recoverable(format!("Unable to sign block: {:?}", e))
})?;
// Publish block with first available beacon node.
self.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blocks(&signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?;
Ok::<_, BlockError>(())
},
)
.await?;
info!(
log,
"Successfully published block";
"block_type" => ?Payload::block_type(),
"deposits" => signed_block.message().body().deposits().len(),
"attestations" => signed_block.message().body().attestations().len(),
"graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()),
"slot" => signed_block.slot().as_u64(),
);
// Publish block with first available beacon node.
self.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async {
match Payload::block_type() {
BlockType::Full => {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blocks(&signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?
}
BlockType::Blinded => {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BLINDED_BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blinded_blocks(&signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?
}
}
Ok::<_, BlockError>(())
},
)
.await?;
};
let blob_publish_future = async {
let signed_blobs = self_ref
.validator_store
.sign_blobs(*validator_pubkey_ref, blobs_sidecar, current_slot)
.await
.map_err(|e| {
BlockError::Recoverable(format!("Unable to sign blob: {:?}", e))
})?;
// Publish block with first available beacon node.
self.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOB_HTTP_POST],
);
beacon_node.post_beacon_blobs(&signed_blobs).await.map_err(
|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing blob: {:?}",
e
))
},
)?;
Ok::<_, BlockError>(())
},
)
.await?;
info!(
log,
"Successfully published blobs";
"block_type" => ?Payload::block_type(),
"slot" => signed_blobs.message.beacon_block_slot.as_u64(),
"block_root" => ?signed_blobs.message.beacon_block_root,
"blobs_len" => signed_blobs.message.blobs.len(),
);
Ok::<_, BlockError>(())
};
let (res_block, res_blob) = tokio::join!(block_publish_future, blob_publish_future);
res_block?;
res_blob?;
}
}
info!(
log,
"Successfully published block";
"block_type" => ?Payload::block_type(),
"deposits" => signed_block.message().body().deposits().len(),
"attestations" => signed_block.message().body().attestations().len(),
"graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()),
"slot" => signed_block.slot().as_u64(),
);
Ok(())
}
}