Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 129 additions & 84 deletions src/units/zone_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bytes::Bytes;
use cascade_zonedata::{OldRecord, RegularRecord, SignedZoneBuilder};
use domain::base::iana::SecurityAlgorithm;
use domain::base::name::FlattenInto;
use domain::base::{CanonicalOrd, Name, Record};
use domain::base::{CanonicalOrd, Name, Record, RelativeName};
use domain::crypto::sign::{SecretKeyBytes, SignRaw};
use domain::dnssec::common::parse_from_bind;
use domain::dnssec::sign::SigningConfig;
Expand All @@ -26,7 +26,7 @@ use domain::dnssec::sign::signatures::rrsigs::{GenerateRrsigConfig, sign_sorted_
use domain::new::base::{RType, Serial};
use domain::new::rdata::RecordData;
use domain::rdata::dnssec::Timestamp;
use domain::rdata::{Dnskey, Nsec3param, ZoneRecordData};
use domain::rdata::{Dnskey, Nsec3param};
use domain::zonefile::inplace::{Entry, Zonefile};
use domain_kmip::KeyUrl;
use domain_kmip::dep::kmip::client::pool::{ConnectionManager, KmipConnError, SyncConnPool};
Expand Down Expand Up @@ -271,7 +271,7 @@ impl ZoneSigner {
}

pub fn on_publish_signed_zone(&self, center: &Arc<Center>) {
trace!("[ZS]: a zone is published, recompute next time to re-sign");
trace!("a zone is published, recompute next time to re-sign");
let _ = self.next_resign_time_tx.send(self.next_resign_time(center));
}

Expand All @@ -281,7 +281,7 @@ impl ZoneSigner {
zone: &Arc<Zone>,
) -> (Arc<RwLock<SigningStatusPerZone>>, [OwnedSemaphorePermit; 3]) {
let zone_name = &zone.name;
info!("[ZS]: Waiting to enqueue signing operation for zone '{zone_name}'.");
info!("Waiting to enqueue signing operation for zone '{zone_name}'.");

self.signer_status.dump_queue();

Expand All @@ -297,7 +297,7 @@ impl ZoneSigner {
let num_ops_in_progress =
self.max_concurrent_operations - self.concurrent_operation_permits.available_permits();
info!(
"[ZS]: Waiting to start signing operation for zone '{zone_name}': {num_ops_in_progress} signing operations are in progress and {} operations are queued ahead of us.",
"Waiting to start signing operation for zone '{zone_name}': {num_ops_in_progress} signing operations are in progress and {} operations are queued ahead of us.",
q_size - 1
);

Expand Down Expand Up @@ -326,7 +326,7 @@ impl ZoneSigner {
return sign_incrementally(self, patcher, zone, center, status);
}

info!("[ZS]: Starting signing operation for zone '{zone_name}'");
info!("Starting signing operation for zone '{zone_name}'");
let start = Instant::now();

let (last_signed_serial, policy) = {
Expand Down Expand Up @@ -410,7 +410,7 @@ impl ZoneSigner {
};

info!(
"[ZS]: Serials for zone '{zone_name}': last signed={last_signed_serial:?}, current={loaded_serial}, serial policy={}, new={serial}",
"Serials for zone '{zone_name}': last signed={last_signed_serial:?}, current={loaded_serial}, serial policy={}, new={serial}",
policy.signer.serial_policy
);

Expand All @@ -437,9 +437,8 @@ impl ZoneSigner {
// Convert zone records into a form we can sign.
//
status.write().unwrap().current_action = "Collecting records to sign".to_string();
debug!("[ZS]: Collecting records to sign for zone '{zone_name}'.");
debug!("Collecting records to sign for zone '{zone_name}'.");
let walk_start = Instant::now();
// TODO: Filter out DNSSEC records from the loaded instance.
let mut records = loaded
.unsigned_records()
.map(OldRecord::from)
Expand Down Expand Up @@ -507,7 +506,7 @@ impl ZoneSigner {
//
// Sort them into DNSSEC order ready for NSEC(3) generation.
//
debug!("[ZS]: Sorting collected records for zone '{zone_name}'.");
debug!("Sorting collected records for zone '{zone_name}'.");
status.write().unwrap().current_action = "Sorting records".to_string();
let sort_start = Instant::now();
// Note: This may briefly use lots of CPU and many CPU cores.
Expand All @@ -526,7 +525,7 @@ impl ZoneSigner {
//
// Generate NSEC(3) RRs.
//
debug!("[ZS]: Generating denial records for zone '{zone_name}'.");
debug!("Generating denial records for zone '{zone_name}'.");
status.write().unwrap().current_action = "Generating denial records".to_string();
let denial_start = Instant::now();
match &signing_config.denial {
Expand Down Expand Up @@ -594,7 +593,7 @@ impl ZoneSigner {
// async task which receives generated RRSIGs via a Tokio
// mpsc::channel and accumulates them into the signed zone.
//
debug!("[ZS]: Generating RRSIG records.");
debug!("Generating RRSIG records.");
status.write().unwrap().current_action = "Generating signature records".to_string();

// TODO: Configure Rayon's thread pool to set the number of threads. By
Expand All @@ -615,71 +614,111 @@ impl ZoneSigner {
// needs a slice of references, so we need to build that here.
let keys = signing_keys.iter().collect::<Vec<_>>();

// TODO: This generation code is incorrect; 'sign_sorted_zone_records'
// looks for zone cuts, but zone cuts may need to be detected _across_
// the segments we split the records into. Zone cut detection needs to
// be re-implemented here with parallel execution in mind. This also
// applies to NSEC(3) generation, but it is currently single-threaded.

// Disable parallel signing for now. This may also split RRsets.
let signatures = if false {
// Split the records into segments.
let segments = rayon::iter::split(0..unsigned_records.len(), |range| {
// Always sign at least 1024 records at a time.
if range.len() < 1024 {
return (range, None);
}
// Sign the records in parallel.
//
// Records are split at top-level labels within the zone (e.g. between
// 'foo.example.org' and 'bar.example.org' in the 'example.org' zone);
// for TLDs, which are by far the largest zones, this will result
// in many small chunks which can be signed in parallel. Signing
// considerations do not cross these boundaries; e.g. a zone cut (an
// `NS` record) in one top-level name does not affect any other.

// Split the records into segments.
let segments = rayon::iter::split(0..unsigned_records.len(), |range| {
// Always sign at least 1024 records at a time.
if range.len() < 1024 {
return (range, None);
}

let midpoint = range.start + range.len() / 2;
let left = range.start..midpoint;
let right = midpoint..range.end;
(left, Some(right))
});

// Generate signatures from each segment.
let signatures = segments.map(|range| {
sign_sorted_zone_records(
&zone.name,
RecordsIter::new_from_owned(&unsigned_records[range]),
&keys,
&rrsig_cfg,
)
});

// Convert the signatures into new-base types and collect them together.
// If errors occur, one error is arbitrarily chosen and returned.
signatures
.try_fold(Vec::new, |mut a, b| {
a.extend(b?.into_iter().map(|r| OldRecord::from_record(r).into()));
Ok::<_, SigningError>(a)
// Find a top-level label boundary close to the midpoint.
//
// We could search _around_ the exact midpoint rather than right
// after it, but there's no significant advantage to this. Most
// top-level labels (in big zones) will have few records.
let midpoint = range.start + range.len() / 2;
let mut prev_top_level_label = None;
let midpoint = unsigned_records[midpoint..range.end]
.iter()
// Extract the top-level label within the zone. If the record
// is at the zone apex, store 'Some(None)'. If the record falls
// outside the zone, store 'None'.
.map(|r| {
// This is mostly complicated due to borrowing issues.
r.owner()
.for_slice()
.for_ref()
.strip_suffix(zone_name)
.ok()
.map(|rn| RelativeName::from_slice(rn.into_octets()).unwrap().last())
})
.try_reduce(Vec::new, |mut a, mut b| {
a.append(&mut b);
Ok(a)
// Find the first position where the extracted values differ
// between adjacent pairs of records. This will return true if
// and only if:
// 1. The records are both within the zone and differ in their
// top-level labels.
// 2. The latter record falls outside the zone. (The former
// record could not have fallen outside the zone, since if
// it did, '.position()' would have terminated already.)
// We do not generate signatures for records outside the zone,
// so we allow splitting anywhere within them.
.position(|top_level_label| {
// If this record falls outside the zone, stop immediately.
let Some(curr) = top_level_label else {
return true;
};

// Retrieve the previous top-level label, and update it
// for the next iteration.
let prev = prev_top_level_label.replace(curr);

// If there was no previous top-level label (i.e. this is
// the first iteration), don't split here.
let Some(prev) = prev else {
return false;
};

// Allow splitting here if the adjacent top-level labels
// differ.
prev != curr
})
.map_err(|err| SignerError::SigningError(err.to_string()))?
} else {
let signatures = sign_sorted_zone_records(
.map(|pos| midpoint + pos);

// If we could not find a useful midpoint, give up. This should be
// very rare, since 'range' covers at least 1024 records.
let Some(midpoint) = midpoint else {
trace!("Could not find a useful midpoint");
return (range, None);
};

// Split the records at this midpoint. The record *at* the midpoint
// has a different top-level label from the record *before* it.
let left = range.start..midpoint;
let right = midpoint..range.end;
(left, Some(right))
});

// Generate signatures from each segment.
let signatures = segments.map(|range| {
sign_sorted_zone_records(
&zone.name,
RecordsIter::new_from_owned(&unsigned_records),
RecordsIter::new_from_owned(&unsigned_records[range]),
&keys,
&rrsig_cfg,
)
});

// Convert the signatures into new-base types and collect them together.
// If errors occur, one error is arbitrarily chosen and returned.
let signatures: Vec<RegularRecord> = signatures
.try_fold(Vec::new, |mut a, b| {
a.extend(b?.into_iter().map(|r| OldRecord::from_record(r).into()));
Ok::<_, SigningError>(a)
})
.try_reduce(Vec::new, |mut a, mut b| {
a.append(&mut b);
Ok(a)
})
.map_err(|err| SignerError::SigningError(err.to_string()))?;
let signatures: Vec<RegularRecord> = signatures
.into_iter()
.map(|s| {
let r = Record::new(
s.owner().clone(),
s.class(),
s.ttl(),
ZoneRecordData::Rrsig(s.data().clone()),
);
r.into()
})
.collect();
signatures
};

let total_signatures = signatures.len();

Expand All @@ -694,7 +733,7 @@ impl ZoneSigner {
writer.set_soa(new_soa.clone()).unwrap();
writer.apply().unwrap();

debug!("SIGNER: Determining min expiration time");
debug!("Determining min expiration time");
let reader = builder.next_signed().unwrap();
let min_expiration = Arc::new(MinTimestamp::new());
let saved_min_expiration = min_expiration.clone();
Expand Down Expand Up @@ -724,7 +763,7 @@ impl ZoneSigner {
// this value should be move to min_expiration.
zone_state.next_min_expiration = saved_min_expiration.get();
debug!(
"SIGNER: Determined min expiration time: {:?}",
"Determined min expiration time: {:?}",
zone_state.next_min_expiration
);

Expand Down Expand Up @@ -842,7 +881,7 @@ impl ZoneSigner {
if let Some(saved_refresh_time) = opt_refresh_time {
if *saved_refresh_time == curr_refresh_time {
// This zone is busy.
trace!("[ZS]: resign: zone {zone_name} is busy");
trace!("resign: zone {zone_name} is busy");
continue;
}

Expand Down Expand Up @@ -922,7 +961,7 @@ impl ZoneSigner {
let refresh_time = UNIX_EPOCH + Duration::from(curr_refresh_time.clone());

if refresh_time < now {
trace!("[ZS]: re-signing: request signing of zone {zone_name}");
trace!("re-signing: request signing of zone {zone_name}");

// Start a new block to make sure the mutex is released.
{
Expand Down Expand Up @@ -1219,23 +1258,29 @@ impl ZoneSignerStatus {
let q_item = q_item.read().unwrap();
match q_item.status {
ZoneSigningStatus::Requested(_) => {
debug!("[ZS]: Queue item: {} => requested", q_item.zone.name)
debug!("Queue item: {} => requested", q_item.zone.name)
}
ZoneSigningStatus::InProgress(_) => {
debug!("[ZS]: Queue item: {} => in-progress", q_item.zone.name)
debug!("Queue item: {} => in-progress", q_item.zone.name)
}
ZoneSigningStatus::Finished(_) => {
debug!("[ZS]: Queue item: {} => finished", q_item.zone.name)
debug!("Queue item: {} => finished", q_item.zone.name)
}
ZoneSigningStatus::Aborted => {
debug!("[ZS]: Queue item: {} => aborted", q_item.zone.name)
debug!("Queue item: {} => aborted", q_item.zone.name)
}
};
}
}
}

/// Enqueue a zone for signing.
#[allow(clippy::type_complexity)] // TODO
#[tracing::instrument(
level = "debug",
skip_all,
fields(zone = %zone.name),
)]
pub async fn enqueue(
&self,
zone: &Arc<Zone>,
Expand All @@ -1249,7 +1294,7 @@ impl ZoneSignerStatus {
SignerError,
> {
let zone_name = &zone.name;
debug!("SIGNER[{zone_name}]: Adding to the queue");
debug!("Adding to the queue");
let status = Arc::new(RwLock::new(SigningStatusPerZone {
zone: zone.clone(),
current_action: "Waiting for any existing signing operation for this zone to finish"
Expand All @@ -1262,9 +1307,9 @@ impl ZoneSignerStatus {
}

let approx_q_size = SIGNING_QUEUE_SIZE - self.queue_semaphore.available_permits() + 1;
debug!("SIGNER[{zone_name}]: Approx queue size = {approx_q_size}");
debug!("Approx queue size = {approx_q_size}");

debug!("SIGNER[{zone_name}]: Acquiring zone permit");
debug!("Acquiring zone permit");
let zone_semaphore = self
.zone_semaphores
.write()
Expand All @@ -1275,18 +1320,18 @@ impl ZoneSignerStatus {
let zone_permit = zone_semaphore.acquire_owned().await.map_err(|_| {
SignerError::InternalError("Cannot acquire the zone semaphore".to_string())
})?;
debug!("SIGNER[{zone_name}]: Zone permit acquired");
debug!("Zone permit acquired");

status.write().unwrap().current_action = "Waiting for a signing queue slot".to_string();

debug!("SIGNER: Acquiring queue permit");
debug!("Acquiring queue permit");
let queue_permit = self
.queue_semaphore
.clone()
.acquire_owned()
.await
.map_err(|_| SignerError::SignerNotReady)?;
debug!("SIGNER[{zone_name}]: Queue permit acquired");
debug!("Queue permit acquired");

// If we were able to acquire a permit that means that a signing operation completed
// and so we are safe to remove one item from the ring buffer.
Expand All @@ -1311,7 +1356,7 @@ impl ZoneSignerStatus {

status.write().unwrap().current_action = "Queued for signing".to_string();

debug!("SIGNER[{zone_name}]: Enqueuing complete.");
debug!("Enqueuing complete.");
Ok((approx_q_size, queue_permit, zone_permit, status))
}
}
Expand Down
Loading