Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 107 additions & 39 deletions internal/datastore/crdb/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
"github.com/authzed/spicedb/pkg/tuple"
)

// queryShowRanges queries ranges from the primary index only. Using
// SHOW RANGES FROM TABLE would include secondary index ranges whose start keys
// have columns in a different order than the PK, producing partition bounds
// that overlap when compared in PK tuple order.
const queryShowRanges = "SELECT start_key FROM [SHOW RANGES FROM INDEX %s@primary] ORDER BY start_key"
// queryShowRanges queries ranges from the primary index only, including
// each range's size in bytes. Using SHOW RANGES FROM TABLE would include
// secondary index ranges whose start keys have columns in a different order
// than the PK, producing partition bounds that overlap when compared in PK
// tuple order. WITH DETAILS exposes range_size, which we use to balance
// partitions by data volume rather than by range count.
const queryShowRanges = "SELECT start_key, range_size FROM [SHOW RANGES FROM INDEX %s@primary WITH DETAILS] ORDER BY start_key"

var (
// SinglePartitionRange is a single partition covering the entire table,
Expand All @@ -31,9 +33,17 @@
_ datastore.BulkExportPartitioner = (*crdbDatastore)(nil)
)

// rangeInfo describes a single CRDB range whose start_key parses to a valid
// PK cursor (a usable split candidate), along with its size in bytes.
type rangeInfo struct {
cursor options.Cursor
size int64
}

// PlanPartitions splits the relationship table into non-overlapping key ranges
// for parallel bulk export. It uses CRDB's SHOW RANGES to align partitions
// with physical data distribution.
// with physical data distribution, and balances partitions by total range size
// so that workers process roughly equal amounts of data.
//
// CRDB automatically splits ranges when they exceed range_max_bytes (default
// 512MB), regardless of the number of nodes. A table with tens of billions of
Expand All @@ -45,98 +55,156 @@
return SinglePartitionRange, nil
}

boundaries, err := cds.rangeBoundaries(ctx)
ranges, err := cds.rangeInfos(ctx)
if err != nil {
log.Ctx(ctx).Warn().Err(err).Uint32("desired_partitions", desiredCount).
Msg("SHOW RANGES failed, returning single partition")
return SinglePartitionRange, nil
}

if len(boundaries) == 0 {
if len(ranges) == 0 {
log.Ctx(ctx).Info().Uint32("desired_partitions", desiredCount).
Str("table", cds.schema.RelationshipTableName).
Msg("no parseable range boundaries found (table may be < 512MB), returning single partition")
return SinglePartitionRange, nil
}

partitions := groupBoundaries(boundaries, desiredCount)
partitions := groupRanges(ctx, ranges, desiredCount)
log.Ctx(ctx).Info().
Uint32("desired_partitions", desiredCount).
Int("parseable_boundaries", len(boundaries)).
Int("parseable_ranges", len(ranges)).
Int("actual_partitions", len(partitions)).
Str("table", cds.schema.RelationshipTableName).
Msg("planned partitioned export from SHOW RANGES")

return partitions, nil
}

// groupBoundaries takes N split-point boundaries and a desired partition count K,
// and returns up to K non-overlapping PartitionRange values.
// N boundaries divide the key space into N+1 regions.
// Boundaries must be sorted in ascending key order.
func groupBoundaries(boundaries []options.Cursor, desiredCount uint32) []datastore.PartitionRange {
if desiredCount <= 1 || len(boundaries) == 0 {
// groupRanges takes a sorted list of ranges (each with a parseable cursor
// usable as a split point and a size) and a desired partition count K. It
// returns up to K non-overlapping PartitionRange values, choosing boundaries
// so that the cumulative size of each partition is as close as possible to
// totalSize/K. If fewer than K-1 ranges are available, fewer than K
// partitions are returned.
//
// Ranges must be sorted in ascending key order. Each range's cursor is the
// lower bound of that range (and, equivalently, the upper bound of the
// preceding range); a split at ranges[i].cursor closes off the partition
// containing ranges[0..i-1] and starts a new one.
func groupRanges(ctx context.Context, ranges []rangeInfo, desiredCount uint32) []datastore.PartitionRange {
if desiredCount <= 1 || len(ranges) == 0 {
return SinglePartitionRange
}

K := int(desiredCount)
N := len(boundaries)
if K > N+1 {
K = N + 1
var totalSize int64
for _, r := range ranges {
totalSize += r.size
}

// If every range reports size 0 (e.g., the table is brand new and CRDB
// hasn't accounted for it yet), fall back to range-count balancing by
// treating each range as one unit. This is logged at info level because
// it materially changes how splits are chosen — partitions will be
// balanced by range count, not by data volume.
useUnitSize := totalSize == 0
if useUnitSize {
log.Ctx(ctx).Info().
Int("ranges", len(ranges)).
Msg("range_size unavailable for all ranges; falling back to range-count balancing")
totalSize = int64(len(ranges))

Check failure on line 115 in internal/datastore/crdb/partitioner.go

View workflow job for this annotation

GitHub Actions / Lint Everything

In package github.com/authzed/spicedb/internal/datastore/crdb: found downcast of `len` call to int64
}

target := totalSize / int64(K)
if target <= 0 {
target = 1
}

partitions := make([]datastore.PartitionRange, K)
// Greedy pack: walk ranges in order, deciding at each cursor whether to
// close off the current partition by splitting there. Each ranges[i].cursor
// is a usable split point — including ranges[0].cursor, which separates
// the unparseable prefix region (whose start_key we can't parse, but which
// holds real data) from the rest. We seed the accumulator with an estimate
// of the prefix region's size so it becomes a candidate split point: one
// unit in unit-size mode, or the average range size in size-aware mode.
// Without this seed the algorithm would always merge the prefix into the
// first partition and produce at most len(ranges) partitions instead of
// len(ranges)+1.
splits := make([]options.Cursor, 0, K-1)
var acc int64
if useUnitSize {
acc = 1
} else if len(ranges) > 0 {
acc = totalSize / int64(len(ranges))

Check failure on line 138 in internal/datastore/crdb/partitioner.go

View workflow job for this annotation

GitHub Actions / Lint Everything

In package github.com/authzed/spicedb/internal/datastore/crdb: found downcast of `len` call to int64
}
for i := 0; i < len(ranges) && len(splits) < K-1; i++ {
if acc >= target {
splits = append(splits, ranges[i].cursor)
acc = 0
}
if useUnitSize {
acc++
} else {
acc += ranges[i].size
}
}

partitions := make([]datastore.PartitionRange, len(splits)+1)
for i := range partitions {
// Select K-1 evenly-spaced boundaries from the N available.
if i > 0 {
idx := (i * N) / K
partitions[i].LowerBound = boundaries[idx]
partitions[i].LowerBound = splits[i-1]
}
if i < K-1 {
idx := ((i + 1) * N) / K
partitions[i].UpperBound = boundaries[idx]
if i < len(splits) {
partitions[i].UpperBound = splits[i]
}
}

return partitions
}

// rangeBoundaries returns the start keys of CRDB ranges for the relationship
// table, parsed into Cursor values. Each cursor represents a PK tuple.
func (cds *crdbDatastore) rangeBoundaries(ctx context.Context) ([]options.Cursor, error) {
// rangeInfos returns CRDB primary-index ranges for the relationship table in
// ascending key order, with each range annotated by its size in bytes. Rows
// whose start_key cannot be parsed as a PK tuple — most commonly the first
// range, whose key is just the table prefix — are skipped: they cannot serve
// as split points, and their size contribution (bounded by range_max_bytes,
// ~512MB) is negligible on the large tables this feature targets.
func (cds *crdbDatastore) rangeInfos(ctx context.Context) ([]rangeInfo, error) {
query := fmt.Sprintf(queryShowRanges, cds.schema.RelationshipTableName)

var boundaries []options.Cursor
var ranges []rangeInfo
var totalRows, skippedRows int
if err := cds.readPool.QueryFunc(ctx, func(ctx context.Context, rows pgx.Rows) error {
for rows.Next() {
totalRows++
var startKey string
if err := rows.Scan(&startKey); err != nil {
return fmt.Errorf("unable to scan range start_key: %w", err)
var (
startKey string
size int64
)

if err := rows.Scan(&startKey, &size); err != nil {
return fmt.Errorf("unable to scan range row: %w", err)

Check warning on line 184 in internal/datastore/crdb/partitioner.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/crdb/partitioner.go#L184

Added line #L184 was not covered by tests
}

cursor, err := parseRangeStartKey(startKey)
if err != nil {
skippedRows++
log.Ctx(ctx).Debug().Err(err).Str("start_key", startKey).Msg("skipping unparseable range boundary")
log.Ctx(ctx).Debug().Err(err).Str("start_key", startKey).Msg("skipping range with unparseable start_key")
continue
}

boundaries = append(boundaries, cursor)
ranges = append(ranges, rangeInfo{cursor: cursor, size: size})
}
return nil
}, query); err != nil {
return nil, fmt.Errorf("range boundaries query failed: %w", err)
return nil, fmt.Errorf("range query failed: %w", err)

Check warning on line 198 in internal/datastore/crdb/partitioner.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/crdb/partitioner.go#L198

Added line #L198 was not covered by tests
}

log.Ctx(ctx).Debug().
Int("total_ranges", totalRows).
Int("parseable_boundaries", len(boundaries)).
Int("parseable_boundaries", len(ranges)).
Int("skipped", skippedRows).
Msg("SHOW RANGES results")

return boundaries, nil
return ranges, nil
}

// parseRangeStartKey parses a CRDB range start key like:
Expand Down
Loading
Loading