Skip to content
1 change: 1 addition & 0 deletions core/grpc/proto/metastore-status.v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ enum RequestProcessingStatus {
COMPLETED = 1; // Request has completed successfully
FAILED = 2; // Request has failed
ANY = 3; // Special value to query any status
DELETED = 4; // Request has been deleted
}

// Information about the status of a specific request
Expand Down
173 changes: 86 additions & 87 deletions core/service/src/engine/centralized/service/crs_gen.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use aes_prng::AesRng;
use alloy_sol_types::Eip712Domain;
use anyhow::Result;
Expand All @@ -10,6 +8,7 @@ use observability::metrics_names::{
CENTRAL_TAG, OP_CRS_GEN_ABORT, OP_CRS_GEN_REQUEST, OP_CRS_GEN_RESULT,
OP_INSECURE_CRS_GEN_REQUEST, TAG_PARTY_ID,
};
use std::sync::Arc;
use threshold_execution::tfhe_internals::parameters::DKGParams;
use tokio_util::sync::CancellationToken;

Expand All @@ -26,7 +25,8 @@ use crate::engine::validation::{
RequestIdParsingErr, parse_grpc_request_id, validate_crs_gen_request,
};
use crate::util::meta_store::{
MetaStore, add_req_to_meta_store, retrieve_from_meta_store, update_err_req_in_meta_store,
MetaStore, MetaStorePermit, add_req_to_meta_store, retrieve_from_meta_store,
update_err_req_in_meta_store,
};
use crate::vault::storage::crypto_material::CentralizedCryptoMaterialStorage;
use crate::vault::storage::{Storage, StorageExt};
Expand Down Expand Up @@ -74,17 +74,14 @@ pub async fn crs_gen_impl<

// check that the request ID is not used yet
// and then insert the request ID only if it's unused
// all validation must be done before inserting the request ID
add_req_to_meta_store(
&mut service.crs_meta_map.write().await,
&verified.req_id,
op_tag,
)?;
// all validation must be done before inserting the request ID.
// The meta-store permit is threaded into crs_gen_background and consumed
// by one of: the abort arm, the generation-error arm, or write_crs.
let meta_permit =
add_req_to_meta_store(&service.crs_meta_map, &verified.req_id, op_tag).await?;

let meta_store = Arc::clone(&service.crs_meta_map);
let meta_store_cancel = Arc::clone(&service.crs_meta_map);
let crypto_storage = service.crypto_storage.clone();
let crypto_storage_cancel = service.crypto_storage.clone();
let sk = service
.base_kms
.sig_key()
Expand Down Expand Up @@ -115,46 +112,26 @@ pub async fn crs_gen_impl<
async move {
let _timer = timer;
let _permit = permit;
tokio::select! {
() = crs_gen_background(
&req_id,
&epoch_id,
rng,
meta_store,
crypto_storage,
sk,
verified.params,
verified.eip712_domain,
verified.extra_data,
max_bits,
op_tag,
) => {
tracing::info!("CRS generation of request {} exiting normally.", req_id);
// Remove cancellation token since generation is now done.
ongoing.lock().await.remove(&req_id);
},
() = token.cancelled() => {
MetricedError::handle_unreturnable_error(
OP_CRS_GEN_REQUEST,
Some(req_id),
anyhow::anyhow!("CRS generation of request {} exiting before completion because of an abort request.", req_id),
);
let del_res = crypto_storage_cancel.inner.purge_crs_material(&req_id, &epoch_id).await;
{
let mut guarded_meta_store = meta_store_cancel.write().await;
let msg = if del_res {
let msg = format!("CRS generation aborted and CRS material deleted successfully for request {}", req_id);
tracing::info!(msg);
msg
} else {
let msg = format!("CRS generation aborted but failed to delete CRS material for request {}", req_id);
tracing::error!(msg);
msg
};
update_err_req_in_meta_store(&mut guarded_meta_store, &req_id, msg, OP_CRS_GEN_REQUEST);
}
}
}
crs_gen_background(
meta_permit,
token,
&req_id,
&epoch_id,
rng,
meta_store,
crypto_storage,
sk,
verified.params,
verified.eip712_domain,
verified.extra_data,
max_bits,
op_tag,
)
.await;
// Cleanup runs on every termination (normal completion, generation
// error, or abort) — the cancellation handling now lives inside
// `crs_gen_background`.
ongoing.lock().await.remove(&req_id);
}
.instrument(tracing::Span::current()),
);
Expand Down Expand Up @@ -183,10 +160,9 @@ pub async fn get_crs_gen_result_impl<
.map_err(|e| MetricedError::new(op_tag, None, e, tonic::Code::InvalidArgument))?;
tracing::debug!("Received CRS gen result request with id {}", request_id);

let crs_info =
retrieve_from_meta_store(service.crs_meta_map.read().await, &request_id, op_tag).await?;
let crs_info = retrieve_from_meta_store(&service.crs_meta_map, &request_id, op_tag).await?;

match crs_info {
match (*crs_info).clone() {
CrsGenMetadata::LegacyV0(_) => {
// This is a legacy result, we cannot return the crs_digest or external_signature
// as they're signed using a different SolStruct and hashed using a different domain separator
Expand Down Expand Up @@ -259,12 +235,21 @@ pub async fn abort_crs_gen_impl<
}
}

/// Background task for CRS generation
/// Background task for CRS generation.
///
/// Owns the meta-store permit for the entire lifetime of the request. The
/// cancellation token is checked only during the long-running
/// [`async_generate_crs`] phase via an inner `tokio::select!`; once
/// generation succeeds, the disk-write phase is intentionally uncancellable
/// so we never end up with "purged material on disk" + "Done state in
/// meta store".
#[allow(clippy::too_many_arguments)]
pub(crate) async fn crs_gen_background<
PubS: Storage + Send + Sync + 'static,
PrivS: StorageExt + Send + Sync + 'static,
>(
permit: MetaStorePermit,
cancel_token: CancellationToken,
req_id: &RequestId,
epoch_id: &EpochId,
rng: AesRng,
Expand All @@ -279,42 +264,56 @@ pub(crate) async fn crs_gen_background<
) {
let start = tokio::time::Instant::now();

let (pp, crs_info) = match async_generate_crs(
&sk,
params,
max_number_bits,
eip712_domain,
extra_data,
req_id,
rng,
)
.await
{
Ok((pp, crs_info)) => (pp, crs_info),
Err(e) => {
let _ = update_err_req_in_meta_store(
&mut meta_store.write().await,
// Race the long-running generation against cancellation.
let outcome: Result<(tfhe::zk::CompactPkeCrs, CrsGenMetadata), String> = tokio::select! {
biased;
() = cancel_token.cancelled() => Err(format!("CRS generation aborted for request {req_id}")),
result = async_generate_crs(
&sk, params, max_number_bits, eip712_domain, extra_data, req_id, rng,
) => result.map_err(|e| e.to_string()),
};

match outcome {
Err(msg) => {
tracing::error!("{msg}");
let del_res = crypto_storage
.inner
.purge_crs_material(req_id, epoch_id)
.await;
let msg = if del_res {
let m = format!(
"CRS generation aborted and CRS material deleted successfully for request {req_id}"
);
tracing::info!(m);
m
} else {
let m = format!(
"CRS generation aborted but failed to delete CRS material for request {req_id}"
);
tracing::error!(m);
m
};

let _ = update_err_req_in_meta_store(&meta_store, permit, msg, op_tag).await;
}
Ok((pp, crs_info)) => {
// write_crs consumes the permit via update_meta_store internally
// (Ok → update_ok_req_in_meta_store, Err → update_err).
if let Err(e) = crypto_storage
.inner
.write_crs(req_id, epoch_id, pp, crs_info, meta_store, permit, op_tag)
.await
{
tracing::error!("Failed to write CRS to storage: {e}");
return;
}
tracing::info!(
"⏱️ Core Event Time for CRS-gen request id {}: {:?}",
req_id,
e.to_string(),
op_tag,
start.elapsed()
);
return;
}
};

if let Err(e) = crypto_storage
.inner
.write_crs(req_id, epoch_id, pp, crs_info, meta_store, op_tag)
.await
{
tracing::error!("Failed to write CRS to storage: {e}");
return;
}
tracing::info!(
"⏱️ Core Event Time for CRS-gen request id {}: {:?}",
req_id,
start.elapsed()
);
}

#[cfg(test)]
Expand Down
41 changes: 22 additions & 19 deletions core/service/src/engine/centralized/service/decryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,12 @@ pub async fn user_decrypt_impl<

let meta_store = Arc::clone(&service.user_dec_meta_store);
let mut rng = service.base_kms.new_rng().await;
add_req_to_meta_store(
&mut service.user_dec_meta_store.write().await,
let meta_permit = add_req_to_meta_store(
&service.user_dec_meta_store,
&request_id,
OP_USER_DECRYPT_REQUEST,
)?;
)
.await?;
let sig_key = service.base_kms.sig_key().map_err(|e| {
MetricedError::new(
OP_USER_DECRYPT_REQUEST,
Expand All @@ -110,6 +111,7 @@ pub async fn user_decrypt_impl<
async move {
let _timer = timer;
let _permit = permit;
let meta_permit = meta_permit;

tracing::info!(
"Starting user decryption using key_id {} for request ID {}",
Expand All @@ -131,11 +133,12 @@ pub async fn user_decrypt_impl<
.await;
let res_with_extra_data = res.map(|(payload, sig)| (payload, sig, extra_data));
let _ = update_req_in_meta_store(
&mut meta_store.write().await,
&request_id,
&meta_store,
meta_permit,
res_with_extra_data,
OP_USER_DECRYPT_REQUEST,
);
)
.await;
}
.instrument(tracing::Span::current()),
);
Expand Down Expand Up @@ -165,12 +168,13 @@ pub async fn get_user_decryption_result_impl<
},
)?;

let (payload, external_signature, extra_data) = retrieve_from_meta_store(
service.user_dec_meta_store.read().await,
let arc = retrieve_from_meta_store(
&service.user_dec_meta_store,
&request_id,
OP_USER_DECRYPT_RESULT,
)
.await?;
let (payload, external_signature, extra_data) = (*arc).clone();

// sign the response
let sig_payload_vec = bc2wrap::serialize(&payload).map_err(|e| {
Expand Down Expand Up @@ -258,11 +262,12 @@ pub async fn public_decrypt_impl<

// if the request already exists, then return the AlreadyExists error
// otherwise attempt to insert it to the meta store
add_req_to_meta_store(
&mut service.pub_dec_meta_store.write().await,
let meta_permit = add_req_to_meta_store(
&service.pub_dec_meta_store,
&request_id,
OP_PUBLIC_DECRYPT_REQUEST,
)?;
)
.await?;

let meta_store = Arc::clone(&service.pub_dec_meta_store);
let sig_key = service.base_kms.sig_key().map_err(|e| {
Expand All @@ -279,6 +284,7 @@ pub async fn public_decrypt_impl<
let _handle = tokio::spawn(async move {
let _timer = timer;
let _permit = permit;
let meta_permit = meta_permit;
tracing::info!(
"Starting decryption using key_id {} for request ID {}",
&key_id,
Expand Down Expand Up @@ -315,12 +321,8 @@ pub async fn public_decrypt_impl<
Err(e) => Err(format!("Error collecting decrypt result: {e:?}")),
Ok(Err(e)) => Err(format!("Error during decryption computation: {e}")),
};
let _ = update_req_in_meta_store(
&mut meta_store.write().await,
&request_id,
res,
OP_PUBLIC_DECRYPT_REQUEST,
);
let _ = update_req_in_meta_store(&meta_store, meta_permit, res, OP_PUBLIC_DECRYPT_REQUEST)
.await;
tracing::info!(
"⏱️ Core Event Time for decryption computation: {:?}",
start.elapsed()
Expand Down Expand Up @@ -355,12 +357,13 @@ pub async fn get_public_decryption_result_impl<
})?;
tracing::debug!("Received get key gen result request with id {}", request_id);

let (retrieved_req_id, plaintexts, external_signature, extra_data) = retrieve_from_meta_store(
service.pub_dec_meta_store.read().await,
let arc = retrieve_from_meta_store(
&service.pub_dec_meta_store,
&request_id,
OP_PUBLIC_DECRYPT_RESULT,
)
.await?;
let (retrieved_req_id, plaintexts, external_signature, extra_data) = (*arc).clone();

if retrieved_req_id != request_id {
return Err(MetricedError::new(
Expand Down
15 changes: 6 additions & 9 deletions core/service/src/engine/centralized/service/initiator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,14 @@ pub async fn init_impl<
));
}
}
add_req_to_meta_store(
&mut service.epoch_ids.write().await,
let permit = add_req_to_meta_store(
&service.epoch_ids,
&verified_request.epoch_id.into(),
OP_NEW_EPOCH,
)?;
update_req_in_meta_store::<(), anyhow::Error>(
&mut service.epoch_ids.write().await,
&verified_request.epoch_id.into(),
Ok(()),
OP_NEW_EPOCH,
);
)
.await?;
update_req_in_meta_store::<(), anyhow::Error>(&service.epoch_ids, permit, Ok(()), OP_NEW_EPOCH)
.await;
tracing::warn!(
"Init called on centralized KMS with ID {} - no action taken",
verified_request.epoch_id
Expand Down
Loading
Loading