From 23747974b6cfa7a233c346847901960063cdb301 Mon Sep 17 00:00:00 2001 From: wangzheyan Date: Tue, 2 Jun 2026 15:20:56 +0800 Subject: [PATCH] feat(index): add distributed vector training --- lance_ray/index.py | 862 ++++++++++++++++++++++++++--- pyproject.toml | 1 + tests/test_distributed_indexing.py | 106 ++++ tests/test_vector_index_options.py | 740 ++++++++++++++++++++++++- uv.lock | 3 + 5 files changed, 1649 insertions(+), 63 deletions(-) diff --git a/lance_ray/index.py b/lance_ray/index.py index 2a3265ed..dc991f5f 100755 --- a/lance_ray/index.py +++ b/lance_ray/index.py @@ -2,11 +2,13 @@ # SPDX-FileCopyrightText: Copyright The Lance Authors import logging +import math import uuid from collections.abc import Callable from typing import Any, Literal, Optional, TypeAlias, Union import lance +import numpy as np import pyarrow as pa import ray from lance.dataset import Index, IndexConfig, LanceDataset @@ -184,6 +186,622 @@ def _put_vector_index_artifacts_in_object_store( ) +def _artifact_len(artifact: Any) -> Optional[int]: + if artifact is None or _is_ray_object_ref(artifact): + return None + try: + return len(artifact) + except TypeError: + return None + + +def _infer_num_partitions_from_artifact( + ivf_centroids: _VectorIndexArtifactRef, +) -> Optional[int]: + return _artifact_len(ivf_centroids) + + +def _infer_num_sub_vectors_from_artifact( + pq_codebook: _VectorIndexArtifactRef, +) -> Optional[int]: + codebook_len = _artifact_len(pq_codebook) + if codebook_len is not None and codebook_len % 256 == 0: + return codebook_len // 256 + return None + + +def _vectors_to_numpy(batch_or_table: Any, column: str) -> np.ndarray: + values = batch_or_table[column] + if hasattr(values, "combine_chunks"): + values = values.combine_chunks() + + if hasattr(values, "to_numpy_ndarray"): + vectors = values.to_numpy_ndarray() + elif pa.types.is_fixed_size_list(values.type): + child_offset = values.offset * values.type.list_size + child_length = len(values) * values.type.list_size + child_values = values.values.slice(child_offset, child_length) + vectors = child_values.to_numpy(zero_copy_only=False).reshape( + len(values), values.type.list_size + ) + else: + vectors = np.asarray(values.to_pylist()) + + vectors = np.asarray(vectors, dtype=np.float32).reshape(len(values), -1) + if vectors.size == 0: + return vectors + finite_mask = np.isfinite(vectors).all(axis=1) + return vectors[finite_mask] + + +def _vector_artifact_to_numpy(artifact: Any) -> np.ndarray: + if artifact is None: + raise ValueError("vector artifact cannot be None") + if hasattr(artifact, "combine_chunks") or hasattr(artifact, "type"): + table = pa.table({"_artifact": artifact}) + return _vectors_to_numpy(table, "_artifact") + vectors = np.asarray(artifact, dtype=np.float32) + return vectors.reshape(len(vectors), -1) + + +def _create_fixed_size_vector_array(vectors: np.ndarray) -> pa.FixedSizeListArray: + vectors = np.asarray(vectors, dtype=np.float32) + if vectors.ndim != 2: + raise ValueError(f"Expected 2-D vector data, got shape {vectors.shape}") + flat = pa.array(vectors.reshape(-1), type=pa.float32()) + return pa.FixedSizeListArray.from_arrays(flat, vectors.shape[1]) + + +def _load_vector_training_sample( + dataset_uri: str, + column: str, + fragment_ids: list[int], + sample_size: int, + storage_options: Optional[dict[str, Any]], + block_size: Optional[int], + namespace_impl: Optional[str], + namespace_properties: Optional[dict[str, str]], + table_id: Optional[list[str]], +) -> np.ndarray: + namespace_kwargs = get_namespace_kwargs( + namespace_impl, namespace_properties, table_id + ) + dataset = LanceDataset( + dataset_uri, + **_dataset_load_kwargs(storage_options, namespace_kwargs, block_size), + ) + fragments = [dataset.get_fragment(fragment_id) for fragment_id in fragment_ids] + scanner = dataset.scanner(columns=[column], fragments=fragments) + + reservoir: np.ndarray | None = None + rows_seen = 0 + reservoir_size = 0 + rng = np.random.default_rng(0) + for batch in scanner.to_batches(): + vectors = _vectors_to_numpy(batch, column) + if vectors.size == 0: + continue + + if reservoir is None: + reservoir = np.empty((sample_size, vectors.shape[1]), dtype=np.float32) + + for vector in vectors: + rows_seen += 1 + if reservoir_size < sample_size: + reservoir[reservoir_size] = vector + reservoir_size += 1 + continue + replacement_index = int(rng.integers(0, rows_seen)) + if replacement_index < sample_size: + reservoir[replacement_index] = vector + + if reservoir is None: + return np.empty((0, 0), dtype=np.float32) + return reservoir[:reservoir_size].astype(np.float32, copy=True) + + +class _VectorTrainingShard: + def __init__( + self, + dataset_uri: str, + column: str, + fragment_ids: list[int], + sample_size: int, + storage_options: Optional[dict[str, Any]], + block_size: Optional[int], + namespace_impl: Optional[str], + namespace_properties: Optional[dict[str, str]], + table_id: Optional[list[str]], + ): + self.vectors = _load_vector_training_sample( + dataset_uri, + column, + fragment_ids, + sample_size, + storage_options, + block_size, + namespace_impl, + namespace_properties, + table_id, + ) + + def metadata(self) -> dict[str, int]: + return _sample_metadata_remote(self.vectors) + + def initial_vectors(self, max_vectors: int) -> np.ndarray: + return _take_initial_vectors_remote(self.vectors, max_vectors) + + def kmeans_partial(self, centroids: np.ndarray, metric: str) -> dict[str, Any]: + return _kmeans_partial(self.vectors, centroids, metric) + + def initial_residuals( + self, + ivf_centroids: np.ndarray, + metric: str, + num_sub_vectors: int, + max_vectors: int, + ) -> np.ndarray: + return _take_initial_residuals_remote( + self.vectors, ivf_centroids, metric, num_sub_vectors, max_vectors + ) + + def pq_partial( + self, ivf_centroids: np.ndarray, codebooks: np.ndarray, metric: str + ) -> dict[str, Any]: + return _pq_kmeans_partial(self.vectors, ivf_centroids, codebooks, metric) + + +def _normalize_metric_for_training(metric: str) -> str: + metric_lower = metric.lower() + if metric_lower in {"l2", "euclidean"}: + return "l2" + if metric_lower == "cosine": + return "cosine" + raise ValueError( + "distributed vector training currently supports metric 'l2' or 'cosine', " + f"got {metric!r}" + ) + + +def _normalize_rows(vectors: np.ndarray) -> np.ndarray: + norms = np.linalg.norm(vectors, axis=1, keepdims=True) + norms[norms == 0] = 1.0 + return vectors / norms + + +def _assign_to_centroids( + vectors: np.ndarray, centroids: np.ndarray, metric: str +) -> tuple[np.ndarray, np.ndarray]: + if len(vectors) == 0: + return np.empty(0, dtype=np.int64), np.empty(0, dtype=np.float32) + + metric = _normalize_metric_for_training(metric) + if metric == "cosine": + vectors_for_distance = _normalize_rows(vectors) + centroids_for_distance = _normalize_rows(centroids) + else: + vectors_for_distance = vectors + centroids_for_distance = centroids + + chunk_size = max(1, min(4096, len(vectors_for_distance))) + assignments: list[np.ndarray] = [] + distances: list[np.ndarray] = [] + centroid_norms = np.sum(centroids_for_distance * centroids_for_distance, axis=1) + for offset in range(0, len(vectors_for_distance), chunk_size): + chunk = vectors_for_distance[offset : offset + chunk_size] + dists = ( + np.sum(chunk * chunk, axis=1, keepdims=True) + + centroid_norms[None, :] + - 2.0 * chunk @ centroids_for_distance.T + ) + dists = np.maximum(dists, 0.0) + chunk_assignments = np.argmin(dists, axis=1) + assignments.append(chunk_assignments.astype(np.int64)) + distances.append( + dists[np.arange(len(chunk)), chunk_assignments].astype(np.float32) + ) + + return np.concatenate(assignments), np.concatenate(distances) + + +def _initial_centroids_from_samples(samples: list[np.ndarray], k: int) -> np.ndarray: + non_empty_samples = [sample for sample in samples if sample.size > 0] + if not non_empty_samples: + raise ValueError("distributed vector training found no sample vectors") + + vectors = np.concatenate(non_empty_samples, axis=0) + if len(vectors) >= k: + indices = np.linspace(0, len(vectors) - 1, k, dtype=np.int64) + return vectors[indices].astype(np.float32, copy=True) + + repeats = math.ceil(k / len(vectors)) + return np.tile(vectors, (repeats, 1))[:k].astype(np.float32, copy=True) + + +def _kmeans_partial( + vectors: np.ndarray, centroids: np.ndarray, metric: str +) -> dict[str, Any]: + if vectors.size == 0: + return { + "sums": np.zeros_like(centroids, dtype=np.float64), + "counts": np.zeros(len(centroids), dtype=np.int64), + "loss": 0.0, + } + + assignments, distances = _assign_to_centroids(vectors, centroids, metric) + sums = np.zeros_like(centroids, dtype=np.float64) + counts = np.bincount(assignments, minlength=len(centroids)).astype(np.int64) + np.add.at(sums, assignments, vectors) + return {"sums": sums, "counts": counts, "loss": float(np.sum(distances))} + + +def _sample_metadata_remote(vectors: np.ndarray) -> dict[str, int]: + if vectors.size == 0: + return {"num_rows": 0, "dimension": 0} + return {"num_rows": len(vectors), "dimension": vectors.shape[1]} + + +def _take_initial_vectors_remote(vectors: np.ndarray, max_vectors: int) -> np.ndarray: + if vectors.size == 0 or max_vectors <= 0: + dimension = vectors.shape[1] if vectors.ndim == 2 else 0 + return np.empty((0, dimension), dtype=np.float32) + if len(vectors) <= max_vectors: + return vectors.astype(np.float32, copy=True) + indices = np.linspace(0, len(vectors) - 1, max_vectors, dtype=np.int64) + return vectors[indices].astype(np.float32, copy=True) + + +def _combine_kmeans_partials( + partials: list[dict[str, Any]], previous_centroids: np.ndarray +) -> tuple[np.ndarray, float]: + sums = np.sum([partial["sums"] for partial in partials], axis=0) + counts = np.sum([partial["counts"] for partial in partials], axis=0) + next_centroids = previous_centroids.astype(np.float32, copy=True) + populated = counts > 0 + next_centroids[populated] = (sums[populated] / counts[populated, None]).astype( + np.float32 + ) + loss = float(sum(partial["loss"] for partial in partials)) + return next_centroids, loss + + +def _run_distributed_kmeans( + shards: list[Any], + initial_centroids: np.ndarray, + *, + k: int, + metric: str, + max_iters: int, + tolerance: float, + ray_remote_args: Optional[dict[str, Any]], +) -> np.ndarray: + if len(initial_centroids) != k: + raise ValueError( + f"initial_centroids length must match k, got {len(initial_centroids)} and {k}" + ) + centroids = initial_centroids.astype(np.float32, copy=True) + + previous_loss: Optional[float] = None + for _ in range(max_iters): + partial_refs = [ + shard.kmeans_partial.remote(centroids, metric) for shard in shards + ] + partials = ray.get(partial_refs) + next_centroids, loss = _combine_kmeans_partials(partials, centroids) + if previous_loss is not None: + delta = abs(previous_loss - loss) + if delta <= tolerance * max(previous_loss, 1.0): + centroids = next_centroids + break + previous_loss = loss + centroids = next_centroids + + if metric == "cosine": + centroids = _normalize_rows(centroids).astype(np.float32) + return centroids + + +def _default_num_sub_vectors(dimension: int) -> int: + if dimension % 16 == 0: + return max(1, dimension // 16) + if dimension % 8 == 0: + return max(1, dimension // 8) + raise ValueError( + "num_sub_vectors must be provided when vector dimension is not divisible " + f"by 8 or 16, got dimension={dimension}" + ) + + +def _compute_residuals( + vectors: np.ndarray, ivf_centroids: np.ndarray, metric: str +) -> np.ndarray: + assignments, _ = _assign_to_centroids(vectors, ivf_centroids, metric) + return vectors - ivf_centroids[assignments] + + +def _take_initial_residuals_remote( + vectors: np.ndarray, + ivf_centroids: np.ndarray, + metric: str, + num_sub_vectors: int, + max_vectors: int, +) -> np.ndarray: + if vectors.size == 0 or max_vectors <= 0: + return np.empty((0, num_sub_vectors, 0), dtype=np.float32) + residuals = _compute_residuals(vectors, ivf_centroids, metric) + residuals = _take_initial_vectors_remote(residuals, max_vectors) + return residuals.reshape(len(residuals), num_sub_vectors, -1).astype( + np.float32, + copy=False, + ) + + +def _initial_pq_codebooks_from_residuals( + residual_samples: list[np.ndarray], + pq_centroids: int, +) -> np.ndarray: + non_empty_samples = [sample for sample in residual_samples if sample.size > 0] + if not non_empty_samples: + raise ValueError("distributed PQ training found no sample vectors") + + num_sub_vectors = non_empty_samples[0].shape[1] + sub_vectors = np.concatenate(non_empty_samples, axis=0) + codebooks = [] + for subvector_index in range(num_sub_vectors): + codebooks.append( + _initial_centroids_from_samples( + [sub_vectors[:, subvector_index, :]], pq_centroids + ) + ) + return np.stack(codebooks).astype(np.float32) + + +def _pq_kmeans_partial( + vectors: np.ndarray, + ivf_centroids: np.ndarray, + codebooks: np.ndarray, + metric: str, +) -> dict[str, Any]: + num_sub_vectors, pq_centroids, sub_vector_dim = codebooks.shape + sums = np.zeros_like(codebooks, dtype=np.float64) + counts = np.zeros((num_sub_vectors, pq_centroids), dtype=np.int64) + if vectors.size == 0: + return {"sums": sums, "counts": counts, "loss": 0.0} + + residuals = _compute_residuals(vectors, ivf_centroids, metric) + sub_vectors = residuals.reshape(len(residuals), num_sub_vectors, sub_vector_dim) + loss = 0.0 + for subvector_index in range(num_sub_vectors): + assignments, distances = _assign_to_centroids( + sub_vectors[:, subvector_index, :], + codebooks[subvector_index], + metric, + ) + counts[subvector_index] = np.bincount( + assignments, minlength=pq_centroids + ).astype(np.int64) + np.add.at( + sums[subvector_index], assignments, sub_vectors[:, subvector_index, :] + ) + loss += float(np.sum(distances)) + return {"sums": sums, "counts": counts, "loss": loss} + + +def _combine_pq_partials( + partials: list[dict[str, Any]], previous_codebooks: np.ndarray +) -> tuple[np.ndarray, float]: + sums = np.sum([partial["sums"] for partial in partials], axis=0) + counts = np.sum([partial["counts"] for partial in partials], axis=0) + next_codebooks = previous_codebooks.astype(np.float32, copy=True) + populated = counts > 0 + next_codebooks[populated] = (sums[populated] / counts[populated][:, None]).astype( + np.float32 + ) + loss = float(sum(partial["loss"] for partial in partials)) + return next_codebooks, loss + + +def _run_distributed_pq_training( + shards: list[Any], + initial_residual_samples: list[np.ndarray], + *, + ivf_centroids: np.ndarray, + metric: str, + num_sub_vectors: int, + max_iters: int, + tolerance: float, + ray_remote_args: Optional[dict[str, Any]], +) -> np.ndarray: + codebooks = _initial_pq_codebooks_from_residuals( + initial_residual_samples, + pq_centroids=256, + ) + previous_loss: Optional[float] = None + for _ in range(max_iters): + partial_refs = [ + shard.pq_partial.remote(ivf_centroids, codebooks, metric) + for shard in shards + ] + partials = ray.get(partial_refs) + next_codebooks, loss = _combine_pq_partials(partials, codebooks) + if previous_loss is not None: + delta = abs(previous_loss - loss) + if delta <= tolerance * max(previous_loss, 1.0): + codebooks = next_codebooks + break + previous_loss = loss + codebooks = next_codebooks + return codebooks + + +def _train_vector_index_artifacts_distributed( + *, + dataset_uri: str, + column: str, + index_type: str, + metric: str, + num_partitions: Optional[int], + num_sub_vectors: Optional[int], + sample_rate: int, + fragment_batches: list[list[int]], + num_workers: int, + storage_options: Optional[dict[str, Any]], + block_size: Optional[int], + namespace_impl: Optional[str], + namespace_properties: Optional[dict[str, str]], + table_id: Optional[list[str]], + ray_remote_args: Optional[dict[str, Any]], + ivf_centroids: _VectorIndexArtifact = None, + pq_codebook: _VectorIndexArtifact = None, + max_iters: int = 50, + tolerance: float = 1e-4, +) -> tuple[_VectorIndexArtifact, _VectorIndexArtifact, int, Optional[int]]: + metric = _normalize_metric_for_training(metric) + pq_index_types = {"IVF_PQ", "IVF_HNSW_PQ"} + needs_pq = index_type in pq_index_types + ivf_centroids_np = None + + dataset = LanceDataset( + dataset_uri, + **_dataset_load_kwargs( + storage_options, + get_namespace_kwargs(namespace_impl, namespace_properties, table_id), + block_size, + ), + ) + num_rows = dataset.count_rows() + + if ivf_centroids is not None: + ivf_centroids_np = _vector_artifact_to_numpy(ivf_centroids) + if num_partitions is None: + num_partitions = len(ivf_centroids_np) + elif len(ivf_centroids_np) != num_partitions: + raise ValueError( + "num_partitions must match provided ivf_centroids length, got " + f"num_partitions={num_partitions}, centroids={len(ivf_centroids_np)}" + ) + elif num_partitions is None: + num_partitions = max(1, min(num_rows, round(math.sqrt(num_rows)))) + + ivf_sample_size = 0 if ivf_centroids is not None else num_partitions * sample_rate + pq_sample_size = 0 + if needs_pq and pq_codebook is None: + pq_sample_size = 256 * sample_rate + + # IVF and PQ have different sample-size requirements; PQ needs enough + # residual samples for 256 codewords per subvector. + sample_size = max(ivf_sample_size, pq_sample_size, num_workers) + sample_size_per_batch = max(1, math.ceil(sample_size / len(fragment_batches))) + shard_actor = ray.remote(_VectorTrainingShard) + if ray_remote_args: + shard_actor = shard_actor.options(**ray_remote_args) + + # Keep sampled vectors inside one actor per shard. The driver only pulls + # scalar metadata, bounded initialization slices, and KMeans partials. + shards = [ + shard_actor.remote( + dataset_uri, + column, + fragment_batch, + sample_size_per_batch, + storage_options, + block_size, + namespace_impl, + namespace_properties, + table_id, + ) + for fragment_batch in fragment_batches + ] + + # Dimension discovery stays on shard actors; no full sample reaches driver. + sample_metadata = ray.get([shard.metadata.remote() for shard in shards]) + dimension = next( + (item["dimension"] for item in sample_metadata if item["num_rows"]), None + ) + if dimension is None: + raise ValueError("distributed vector training found no vectors to train on") + if ivf_centroids_np is not None and ivf_centroids_np.shape[1] != dimension: + raise ValueError( + "provided ivf_centroids dimension does not match vector column, got " + f"centroids={ivf_centroids_np.shape[1]}, column={dimension}" + ) + + if needs_pq: + if pq_codebook is not None and num_sub_vectors is None: + num_sub_vectors = _infer_num_sub_vectors_from_artifact(pq_codebook) + if num_sub_vectors is None: + num_sub_vectors = _default_num_sub_vectors(dimension) + if num_sub_vectors <= 0: + raise ValueError(f"num_sub_vectors must be positive, got {num_sub_vectors}") + if dimension % num_sub_vectors != 0: + raise ValueError( + "num_sub_vectors must divide vector dimension, got " + f"dimension={dimension}, num_sub_vectors={num_sub_vectors}" + ) + + if ivf_centroids_np is None: + logger.info( + "Training IVF centroids with distributed KMeans: partitions=%d, samples=%d", + num_partitions, + sample_size, + ) + initial_sample_size = max(1, math.ceil(num_partitions / len(shards))) + initial_samples = ray.get( + [shard.initial_vectors.remote(initial_sample_size) for shard in shards] + ) + initial_centroids = _initial_centroids_from_samples( + initial_samples, num_partitions + ) + ivf_centroids_np = _run_distributed_kmeans( + shards, + initial_centroids, + k=num_partitions, + metric=metric, + max_iters=max_iters, + tolerance=tolerance, + ray_remote_args=ray_remote_args, + ) + ivf_centroids = _create_fixed_size_vector_array(ivf_centroids_np) + + pq_codebook_artifact = pq_codebook + if needs_pq: + assert num_sub_vectors is not None + if pq_codebook_artifact is None: + logger.info( + "Training PQ codebook with distributed KMeans: subvectors=%d", + num_sub_vectors, + ) + residual_sample_size = max(1, math.ceil(256 / len(shards))) + # PQ needs only a bounded residual sample to seed codebooks; KMeans + # iterations still use all per-shard samples through shard actors. + initial_residual_samples = ray.get( + [ + shard.initial_residuals.remote( + ivf_centroids_np, + metric, + num_sub_vectors, + residual_sample_size, + ) + for shard in shards + ] + ) + pq_codebooks_np = _run_distributed_pq_training( + shards, + initial_residual_samples, + ivf_centroids=ivf_centroids_np, + metric=metric, + num_sub_vectors=num_sub_vectors, + max_iters=max_iters, + tolerance=tolerance, + ray_remote_args=ray_remote_args, + ) + pq_codebook_artifact = _create_fixed_size_vector_array( + pq_codebooks_np.reshape(-1, pq_codebooks_np.shape[-1]) + ) + + return ivf_centroids, pq_codebook_artifact, num_partitions, num_sub_vectors + + def _handle_fragment_index( dataset_uri: str, column: str, @@ -673,22 +1291,22 @@ def _check_pylance_version() -> None: try: lance_version = version.parse(lance.__version__) - min_required_version = version.parse("0.36.0") + min_required_version = version.parse("7.0.0b7") if lance_version < min_required_version: raise RuntimeError( - "Distributed vector indexing requires pylance >= 0.36.0, but found " + "Distributed vector indexing requires pylance >= 7.0.0b7, but found " f"{lance.__version__}. The distributed vector interfaces are not " "available in older versions. Please upgrade pylance by running: " "pip install --upgrade pylance" ) - logger.info("Pylance version check passed: %s >= 0.36.0", lance.__version__) + logger.info("Pylance version check passed: %s >= 7.0.0b7", lance.__version__) except AttributeError as err: # pragma: no cover - defensive raise RuntimeError( "Cannot determine pylance version. Distributed vector indexing requires " - "pylance >= 0.36.0. Please upgrade pylance by running: " + "pylance >= 7.0.0b7. Please upgrade pylance by running: " "pip install --upgrade pylance" ) from err @@ -815,11 +1433,16 @@ def create_index( namespace_impl: Optional[str] = None, namespace_properties: Optional[dict[str, str]] = None, table_id: Optional[list[str]] = None, + fragment_ids: Optional[list[int]] = None, ray_remote_args: Optional[dict[str, Any]] = None, metric: str = "l2", num_partitions: Optional[int] = None, num_sub_vectors: Optional[int] = None, sample_rate: int = 256, + segment_native: bool = True, + training_mode: Literal["driver", "distributed"] = "driver", + training_max_iters: int = 50, + training_tolerance: float = 1e-4, ivf_centroids: Optional[ pa.Array | pa.FixedSizeListArray | pa.FixedShapeTensorArray ] = None, @@ -831,7 +1454,7 @@ def create_index( """Build distributed vector indices with Ray. This function mirrors :func:`create_scalar_index` but targets the precise - vector index families supported by Lance's distributed merge pipeline. + vector index families supported by Lance's distributed segment APIs. Args: uri: Lance dataset or URI to build index on @@ -842,11 +1465,20 @@ def create_index( num_workers: Number of Ray workers to use (keyword-only) storage_options: Storage options for the dataset (keyword-only) block_size: Block size in bytes to use when loading the dataset (keyword-only) + fragment_ids: Optional list of fragment IDs to build index segments on. ray_remote_args: Options for Ray tasks (keyword-only) metric: Distance metric to use (default: "l2") num_partitions: Number of IVF partitions (optional) num_sub_vectors: Number of PQ sub-vectors (optional) sample_rate: Number of rows sampled per IVF partition and PQ centroid (default: 256) + segment_native: Whether to directly commit worker-produced segments. + Defaults to True. Set to False to use the driver-finalized + compatibility path that runs build_all() before commit. + training_mode: Where missing IVF/PQ training artifacts are trained. + "driver" preserves Lance's existing driver IndicesBuilder path. + "distributed" runs Ray-distributed KMeans for IVF and PQ training. + training_max_iters: Maximum KMeans iterations for distributed training. + training_tolerance: Relative KMeans loss tolerance for distributed training. ivf_centroids: Pre-computed IVF centroids (optional) pq_codebook: Pre-computed PQ codebook (optional) **kwargs: Additional arguments to pass to the fragment index build entrypoint @@ -866,6 +1498,22 @@ def create_index( if sample_rate <= 0: raise ValueError(f"sample_rate must be positive, got {sample_rate}") + if training_mode not in {"driver", "distributed"}: + raise ValueError( + "training_mode must be one of {'driver', 'distributed'}, " + f"got {training_mode!r}" + ) + + if training_max_iters <= 0: + raise ValueError( + f"training_max_iters must be positive, got {training_max_iters}" + ) + + if training_tolerance < 0: + raise ValueError( + f"training_tolerance must be non-negative, got {training_tolerance}" + ) + if block_size is not None and block_size <= 0: raise ValueError(f"block_size must be positive, got {block_size}") @@ -946,66 +1594,148 @@ def create_index( if not fragments: raise ValueError("Dataset contains no fragments") + if fragment_ids is not None: + if not fragment_ids: + raise ValueError("fragment_ids cannot be empty") + + available_fragment_ids = {fragment.fragment_id for fragment in fragments} + invalid_fragments = set(fragment_ids) - available_fragment_ids + if invalid_fragments: + raise ValueError( + f"Fragment IDs {invalid_fragments} do not exist in dataset" + ) + + requested_fragment_ids = set(fragment_ids) + fragments = [ + fragment + for fragment in fragments + if fragment.fragment_id in requested_fragment_ids + ] + fragment_ids_to_use = [fragment.fragment_id for fragment in fragments] if num_workers > len(fragment_ids_to_use): num_workers = len(fragment_ids_to_use) logger.info("Adjusted num_workers to %d to match fragment count", num_workers) + fragment_batches = _distribute_fragments_balanced( + fragments, num_workers=num_workers, logger=logger + ) + ivf_centroids_artifact = ivf_centroids pq_codebook_artifact = pq_codebook pq_index_types = {"IVF_PQ", "IVF_HNSW_PQ"} needs_pq = index_type_name in pq_index_types + needs_ivf_training = ivf_centroids_artifact is None + needs_pq_training = needs_pq and pq_codebook_artifact is None - # Always perform global IVF training up front so that all shards share the - # same centroids and number of partitions. The Ray entrypoint owns the - # lifecycle of these artifacts and distributes them to workers. - logger.info( - "Phase 1: Training IVF centroids (index_type=%s, metric=%s)", - index_type_name, - metric_lower, - ) - builder = IndicesBuilder(dataset_obj, column) - num_rows = dataset_obj.count_rows() - dimension = builder.dimension - - requested_num_partitions = num_partitions - logger.info( - "Training IVF with requested_num_partitions=%s, num_rows=%d, " - "dimension=%d, sample_rate=%d", - requested_num_partitions, - num_rows, - dimension, - sample_rate, - ) - ivf_model = builder.train_ivf( - num_partitions=requested_num_partitions, - distance_type=metric_lower, - sample_rate=sample_rate, - ) - ivf_centroids_artifact = ivf_model.centroids - num_partitions = ivf_model.num_partitions - logger.info( - "IVF training completed: num_partitions=%d", - num_partitions, - ) + if needs_pq and pq_codebook_artifact is not None and ivf_centroids_artifact is None: + raise ValueError( + "ivf_centroids must be provided together with pq_codebook for " + "PQ-based vector indices; PQ codebooks are trained against IVF " + "residuals and cannot be safely reused without matching IVF centroids" + ) - if needs_pq: - requested_num_sub_vectors = num_sub_vectors + if needs_ivf_training or needs_pq_training: logger.info( - "Training PQ codebook: requested_num_sub_vectors=%s, sample_rate=%d", - requested_num_sub_vectors, - sample_rate, + "Phase 1: Training vector artifacts (mode=%s, index_type=%s, metric=%s)", + training_mode, + index_type_name, + metric_lower, ) - pq_model = builder.train_pq( - ivf_model, - num_subvectors=requested_num_sub_vectors, + + if training_mode == "distributed" and (needs_ivf_training or needs_pq_training): + ( + ivf_centroids_artifact, + pq_codebook_artifact, + num_partitions, + num_sub_vectors, + ) = _train_vector_index_artifacts_distributed( + dataset_uri=dataset_uri, + column=column, + index_type=index_type_name, + metric=metric_lower, + num_partitions=num_partitions, + num_sub_vectors=num_sub_vectors, sample_rate=sample_rate, + fragment_batches=fragment_batches, + num_workers=num_workers, + storage_options=merged_storage_options, + block_size=block_size, + namespace_impl=namespace_impl, + namespace_properties=namespace_properties, + table_id=table_id, + ray_remote_args=ray_remote_args, + ivf_centroids=ivf_centroids_artifact, + pq_codebook=pq_codebook_artifact, + max_iters=training_max_iters, + tolerance=training_tolerance, + ) + elif needs_ivf_training or needs_pq_training: + builder = IndicesBuilder(dataset_obj, column) + num_rows = dataset_obj.count_rows() + dimension = builder.dimension + + if needs_ivf_training: + requested_num_partitions = num_partitions + logger.info( + "Training IVF on driver with requested_num_partitions=%s, " + "num_rows=%d, dimension=%d, sample_rate=%d", + requested_num_partitions, + num_rows, + dimension, + sample_rate, + ) + ivf_model = builder.train_ivf( + num_partitions=requested_num_partitions, + distance_type=metric_lower, + sample_rate=sample_rate, + ) + ivf_centroids_artifact = ivf_model.centroids + num_partitions = ivf_model.num_partitions + logger.info( + "IVF training completed: num_partitions=%d", + num_partitions, + ) + else: + ivf_model = None + + if needs_pq_training: + if ivf_model is None: + raise ValueError( + "pq_codebook must be provided when ivf_centroids is provided " + "in driver training mode; use training_mode='distributed' " + "to train both artifacts together" + ) + requested_num_sub_vectors = num_sub_vectors + logger.info( + "Training PQ codebook on driver: requested_num_sub_vectors=%s, " + "sample_rate=%d", + requested_num_sub_vectors, + sample_rate, + ) + pq_model = builder.train_pq( + ivf_model, + num_subvectors=requested_num_sub_vectors, + sample_rate=sample_rate, + ) + pq_codebook_artifact = pq_model.codebook + num_sub_vectors = pq_model.num_subvectors + logger.info("PQ training completed: num_sub_vectors=%d", num_sub_vectors) + + if num_partitions is None: + num_partitions = _infer_num_partitions_from_artifact(ivf_centroids_artifact) + if needs_pq and num_sub_vectors is None: + num_sub_vectors = _infer_num_sub_vectors_from_artifact(pq_codebook_artifact) + + if not needs_ivf_training and not needs_pq_training: + logger.info( + "Phase 1: Reusing provided vector training artifacts " + "(index_type=%s, metric=%s)", + index_type_name, + metric_lower, ) - pq_codebook_artifact = pq_model.codebook - num_sub_vectors = pq_model.num_subvectors - logger.info("PQ training completed: num_sub_vectors=%d", num_sub_vectors) if ivf_centroids_artifact is None: raise ValueError( @@ -1019,9 +1749,11 @@ def create_index( "distributed vector indices" ) - fragment_batches = _distribute_fragments_balanced( - fragments, num_workers=num_workers, logger=logger - ) + if num_partitions is None: + raise ValueError("num_partitions must be provided when using ivf_centroids") + + if needs_pq and num_sub_vectors is None: + raise ValueError("num_sub_vectors must be provided when using pq_codebook") logger.info( "Phase 2: Distributing vector index build across %d workers for %d fragments", @@ -1074,22 +1806,28 @@ def create_fragment_handler() -> Any: **_dataset_load_kwargs(merged_storage_options, namespace_kwargs, block_size), ) - logger.info( - "Phase 3: Building and committing index segments for vector index '%s'", - name, - ) - successful_results = [r for r in results if r.get("status") == "success"] if not successful_results: raise RuntimeError("No successful vector index creation results found") segment_indices = [r["segment_index"] for r in successful_results] - segment_builder = ( - dataset_obj.create_index_segment_builder() - .with_index_type(index_type_name) - .with_segments(segment_indices) - ) - segments = segment_builder.build_all() + if segment_native: + logger.info( + "Phase 3: Committing segment-native worker segments for vector index '%s'", + name, + ) + segments = segment_indices + else: + logger.info( + "Phase 3: Driver-finalizing and committing index segments for vector index '%s'", + name, + ) + segment_builder = ( + dataset_obj.create_index_segment_builder() + .with_index_type(index_type_name) + .with_segments(segment_indices) + ) + segments = segment_builder.build_all() updated_dataset = dataset_obj.commit_existing_index_segments( index_name=name, diff --git a/pyproject.toml b/pyproject.toml index 9b809d48..252a249a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ dependencies = [ "pylance>=7.0.0b7", "lance-namespace", "packaging", + "numpy>=1.24", "pyarrow>=17.0.0", "pytest>=8.4.0", "pytest-cov>=5.0.0", diff --git a/tests/test_distributed_indexing.py b/tests/test_distributed_indexing.py index 7adf943d..bad33d81 100755 --- a/tests/test_distributed_indexing.py +++ b/tests/test_distributed_indexing.py @@ -1301,3 +1301,109 @@ def test_build_distributed_vector_index(tmp_path, index_type): ) assert result.num_rows == 5 + + +def test_build_distributed_vector_index_with_distributed_training(tmp_path): + """Build IVF/PQ with Ray-distributed IVF and PQ training enabled.""" + + dataset_uri = generate_multi_fragment_vector_dataset( + tmp_path, num_fragments=2, rows_per_fragment=64, dim=16 + ) + + try: + updated_dataset = lr.create_index( + uri=dataset_uri, + column="vector", + index_type="IVF_PQ", + name="idx_ivf_pq_distributed_training", + num_workers=2, + num_partitions=2, + num_sub_vectors=4, + sample_rate=4, + training_mode="distributed", + training_max_iters=2, + ) + except RuntimeError as exc: + msg = str(exc) + if ( + "Creating empty vector indices with train=False is not yet implemented" + in msg + ): + pytest.skip( + "Current pylance build does not yet support distributed vector " + "indices with train=False; skipping functional test." + ) + raise + + indices = updated_dataset.list_indices() + vec_index = next( + (idx for idx in indices if idx["name"] == "idx_ivf_pq_distributed_training"), + None, + ) + + assert vec_index is not None + assert vec_index["type"] == "IVF_PQ" + + q = [random.gauss(0, 1) for _ in range(16)] + result = updated_dataset.to_table( + nearest={"column": "vector", "q": q, "k": 5}, + columns=["id", "vector"], + ) + + assert result.num_rows == 5 + + +def test_build_distributed_vector_index_with_provided_ivf_training_pq(tmp_path): + """Build IVF/PQ by reusing IVF centroids and distributed-training PQ only.""" + + dataset_uri = generate_multi_fragment_vector_dataset( + tmp_path, num_fragments=2, rows_per_fragment=64, dim=16 + ) + centroids = np.asarray( + [[-1.0] * 16, [1.0] * 16], + dtype=np.float32, + ) + ivf_values = pa.array(centroids.reshape(-1), type=pa.float32()) + ivf_centroids = pa.FixedSizeListArray.from_arrays(ivf_values, 16) + + try: + updated_dataset = lr.create_index( + uri=dataset_uri, + column="vector", + index_type="IVF_PQ", + name="idx_ivf_pq_provided_ivf", + num_workers=2, + num_partitions=2, + num_sub_vectors=4, + sample_rate=4, + training_mode="distributed", + training_max_iters=2, + ivf_centroids=ivf_centroids, + ) + except RuntimeError as exc: + msg = str(exc) + if ( + "Creating empty vector indices with train=False is not yet implemented" + in msg + ): + pytest.skip( + "Current pylance build does not yet support distributed vector " + "indices with train=False; skipping functional test." + ) + raise + + indices = updated_dataset.list_indices() + vec_index = next( + (idx for idx in indices if idx["name"] == "idx_ivf_pq_provided_ivf"), None + ) + + assert vec_index is not None + assert vec_index["type"] == "IVF_PQ" + + q = [random.gauss(0, 1) for _ in range(16)] + result = updated_dataset.to_table( + nearest={"column": "vector", "q": q, "k": 5}, + columns=["id", "vector"], + ) + + assert result.num_rows == 5 diff --git a/tests/test_vector_index_options.py b/tests/test_vector_index_options.py index 8939e913..cbfe7f88 100644 --- a/tests/test_vector_index_options.py +++ b/tests/test_vector_index_options.py @@ -5,6 +5,7 @@ from pathlib import Path from types import ModuleType, SimpleNamespace +import numpy as np import pytest @@ -102,6 +103,11 @@ def count_rows(self): class _FakeSegmentBuilder: + def __init__(self): + self.build_all_calls = 0 + self.index_type = None + self.segments = None + def with_index_type(self, index_type): self.index_type = index_type return self @@ -111,6 +117,7 @@ def with_segments(self, segments): return self def build_all(self): + self.build_all_calls += 1 return ["merged_segment"] @@ -120,6 +127,11 @@ class _FakeDataset: lance_schema = _FakeLanceSchema() version = 1 + def __init__(self): + self.segment_builder_calls = 0 + self.last_segment_builder = None + self.commit_kwargs = None + def get_fragments(self): return [_FakeFragment(0, 100), _FakeFragment(1, 100)] @@ -133,7 +145,9 @@ def create_scalar_index(self, **kwargs): self.scalar_index_kwargs = kwargs def create_index_segment_builder(self): - return _FakeSegmentBuilder() + self.segment_builder_calls += 1 + self.last_segment_builder = _FakeSegmentBuilder() + return self.last_segment_builder def create_index_uncommitted(self, **kwargs): self.vector_index_kwargs = kwargs @@ -267,6 +281,721 @@ def fake_map_async_with_pool(**kwargs): assert "sample_rate" not in captured["fragment_handler_kwargs"] +def test_create_index_uses_provided_vector_training_artifacts(monkeypatch): + """Provided IVF/PQ artifacts should be reused without driver-side training.""" + + captured = {} + fake_dataset = _FakeDataset() + + class FailingIndicesBuilder: + def __init__(self, dataset, column): + raise AssertionError("provided artifacts should skip training") + + def fake_handle_vector_fragment_index(**kwargs): + captured["fragment_handler_kwargs"] = kwargs + return lambda fragment_ids: {"status": "success", "fragment_ids": fragment_ids} + + def fake_put_vector_index_artifacts(ivf_centroids, pq_codebook): + captured["put_artifacts"] = (ivf_centroids, pq_codebook) + return "ivf_ref", "pq_ref" + + def fake_map_async_with_pool(**kwargs): + captured["map_kwargs"] = kwargs + kwargs["create_fragment_handler"]() + return [ + { + "status": "success", + "fragment_ids": [0, 1], + "segment_index": "segment", + } + ] + + monkeypatch.setattr(index_mod, "_check_pylance_version", lambda: None) + monkeypatch.setattr(index_mod, "IndicesBuilder", FailingIndicesBuilder) + monkeypatch.setattr(index_mod, "LanceDataset", lambda *args, **kwargs: fake_dataset) + monkeypatch.setattr( + index_mod, + "_handle_vector_fragment_index", + fake_handle_vector_fragment_index, + ) + monkeypatch.setattr( + index_mod, + "_put_vector_index_artifacts_in_object_store", + fake_put_vector_index_artifacts, + ) + monkeypatch.setattr(index_mod, "_map_async_with_pool", fake_map_async_with_pool) + + updated_dataset = index_mod.create_index( + uri="memory://fake", + column="vector", + index_type="IVF_PQ", + name="vector_idx", + num_workers=2, + num_partitions=4, + num_sub_vectors=4, + ivf_centroids="provided_ivf", + pq_codebook="provided_pq", + ) + + assert updated_dataset is fake_dataset + assert captured["put_artifacts"] == ("provided_ivf", "provided_pq") + assert captured["fragment_handler_kwargs"]["ivf_centroids"] == "ivf_ref" + assert captured["fragment_handler_kwargs"]["pq_codebook"] == "pq_ref" + + +def test_create_index_uses_distributed_vector_training(monkeypatch): + """Distributed training mode should produce shared artifacts before workers build.""" + + captured = {} + fake_dataset = _FakeDataset() + + class FailingIndicesBuilder: + def __init__(self, dataset, column): + raise AssertionError( + "distributed mode should not use driver IndicesBuilder" + ) + + def fake_distributed_training(**kwargs): + captured["distributed_training"] = kwargs + return "dist_ivf", "dist_pq", 4, 2 + + def fake_handle_vector_fragment_index(**kwargs): + captured["fragment_handler_kwargs"] = kwargs + return lambda fragment_ids: {"status": "success", "fragment_ids": fragment_ids} + + def fake_put_vector_index_artifacts(ivf_centroids, pq_codebook): + captured["put_artifacts"] = (ivf_centroids, pq_codebook) + return "ivf_ref", "pq_ref" + + def fake_map_async_with_pool(**kwargs): + captured["map_kwargs"] = kwargs + kwargs["create_fragment_handler"]() + return [ + { + "status": "success", + "fragment_ids": [0, 1], + "segment_index": "segment", + } + ] + + monkeypatch.setattr(index_mod, "_check_pylance_version", lambda: None) + monkeypatch.setattr(index_mod, "IndicesBuilder", FailingIndicesBuilder) + monkeypatch.setattr(index_mod, "LanceDataset", lambda *args, **kwargs: fake_dataset) + monkeypatch.setattr( + index_mod, + "_train_vector_index_artifacts_distributed", + fake_distributed_training, + ) + monkeypatch.setattr( + index_mod, + "_handle_vector_fragment_index", + fake_handle_vector_fragment_index, + ) + monkeypatch.setattr( + index_mod, + "_put_vector_index_artifacts_in_object_store", + fake_put_vector_index_artifacts, + ) + monkeypatch.setattr(index_mod, "_map_async_with_pool", fake_map_async_with_pool) + + updated_dataset = index_mod.create_index( + uri="memory://fake", + column="vector", + index_type="IVF_PQ", + name="vector_idx", + num_workers=2, + num_partitions=4, + num_sub_vectors=2, + sample_rate=8, + training_mode="distributed", + ) + + assert updated_dataset is fake_dataset + assert captured["distributed_training"]["dataset_uri"] == "memory://fake" + assert captured["distributed_training"]["index_type"] == "IVF_PQ" + assert captured["distributed_training"]["num_workers"] == 2 + assert sorted(sum(captured["distributed_training"]["fragment_batches"], [])) == [ + 0, + 1, + ] + assert captured["distributed_training"]["sample_rate"] == 8 + assert captured["put_artifacts"] == ("dist_ivf", "dist_pq") + assert captured["fragment_handler_kwargs"]["ivf_centroids"] == "ivf_ref" + assert captured["fragment_handler_kwargs"]["pq_codebook"] == "pq_ref" + + +def test_create_index_distributed_training_reuses_provided_ivf(monkeypatch): + """Distributed PQ training should preserve caller-provided IVF centroids.""" + + captured = {} + fake_dataset = _FakeDataset() + + class FailingIndicesBuilder: + def __init__(self, dataset, column): + raise AssertionError( + "distributed mode should not use driver IndicesBuilder" + ) + + def fake_distributed_training(**kwargs): + captured["distributed_training"] = kwargs + return kwargs["ivf_centroids"], "dist_pq", 4, 2 + + def fake_handle_vector_fragment_index(**kwargs): + captured["fragment_handler_kwargs"] = kwargs + return lambda fragment_ids: {"status": "success", "fragment_ids": fragment_ids} + + def fake_put_vector_index_artifacts(ivf_centroids, pq_codebook): + captured["put_artifacts"] = (ivf_centroids, pq_codebook) + return "ivf_ref", "pq_ref" + + def fake_map_async_with_pool(**kwargs): + kwargs["create_fragment_handler"]() + return [ + { + "status": "success", + "fragment_ids": [0, 1], + "segment_index": "segment", + } + ] + + monkeypatch.setattr(index_mod, "_check_pylance_version", lambda: None) + monkeypatch.setattr(index_mod, "IndicesBuilder", FailingIndicesBuilder) + monkeypatch.setattr(index_mod, "LanceDataset", lambda *args, **kwargs: fake_dataset) + monkeypatch.setattr( + index_mod, + "_train_vector_index_artifacts_distributed", + fake_distributed_training, + ) + monkeypatch.setattr( + index_mod, + "_handle_vector_fragment_index", + fake_handle_vector_fragment_index, + ) + monkeypatch.setattr( + index_mod, + "_put_vector_index_artifacts_in_object_store", + fake_put_vector_index_artifacts, + ) + monkeypatch.setattr(index_mod, "_map_async_with_pool", fake_map_async_with_pool) + + updated_dataset = index_mod.create_index( + uri="memory://fake", + column="vector", + index_type="IVF_PQ", + name="vector_idx", + num_workers=2, + num_partitions=4, + num_sub_vectors=2, + ivf_centroids="provided_ivf", + training_mode="distributed", + ) + + assert updated_dataset is fake_dataset + assert captured["distributed_training"]["ivf_centroids"] == "provided_ivf" + assert captured["distributed_training"]["pq_codebook"] is None + assert captured["put_artifacts"] == ("provided_ivf", "dist_pq") + assert captured["fragment_handler_kwargs"]["ivf_centroids"] == "ivf_ref" + assert captured["fragment_handler_kwargs"]["pq_codebook"] == "pq_ref" + + +def test_distributed_training_uses_shard_actors_and_pq_sample_size(monkeypatch): + """Distributed training should keep samples in actors and size PQ sampling.""" + + samples_by_fragment = { + (0,): np.asarray( + [[0.0, 0.0, 0.0, 0.0], [0.2, 0.1, 0.0, 0.0]], dtype=np.float32 + ), + (1,): np.asarray( + [[10.0, 10.0, 10.0, 10.0], [10.2, 10.1, 10.0, 10.0]], + dtype=np.float32, + ), + } + requested_sample_sizes = [] + + class FakeDataset: + def count_rows(self): + return 4 + + class FakeResultRef: + def __init__(self, value): + self.value = value + + class FakeActorMethod: + def __init__(self, fn): + self.fn = fn + + def remote(self, *args): + return FakeResultRef(self.fn(*args)) + + class FakeActorHandle: + def __init__(self, actor): + self._actor = actor + + def __getattr__(self, name): + attr = getattr(self._actor, name) + if callable(attr): + return FakeActorMethod(attr) + return attr + + class FakeRemoteActorClass: + def __init__(self, actor_cls): + self.actor_cls = actor_cls + + def options(self, **kwargs): + return self + + def remote(self, *args): + return FakeActorHandle(self.actor_cls(*args)) + + def fake_remote(target): + if target is not index_mod._VectorTrainingShard: + raise AssertionError("distributed training should use shard actors") + return FakeRemoteActorClass(target) + + def fake_load_vector_training_sample( + dataset_uri, + column, + fragment_ids, + sample_size, + storage_options, + block_size, + namespace_impl, + namespace_properties, + table_id, + ): + requested_sample_sizes.append(sample_size) + return samples_by_fragment[tuple(fragment_ids)] + + def fake_get(value): + if isinstance(value, list): + return [item.value for item in value] + return value.value + + fake_ray = SimpleNamespace( + remote=fake_remote, + get=fake_get, + ObjectRef=type("FakeObjectRef", (), {}), + ) + + monkeypatch.setattr(index_mod, "ray", fake_ray) + monkeypatch.setattr( + index_mod, + "_load_vector_training_sample", + fake_load_vector_training_sample, + ) + monkeypatch.setattr( + index_mod, "LanceDataset", lambda *args, **kwargs: FakeDataset() + ) + + ivf_centroids, pq_codebook, num_partitions, num_sub_vectors = ( + index_mod._train_vector_index_artifacts_distributed( + dataset_uri="memory://fake", + column="vector", + index_type="IVF_PQ", + metric="l2", + num_partitions=2, + num_sub_vectors=2, + sample_rate=4, + fragment_batches=[[0], [1]], + num_workers=2, + storage_options={}, + block_size=None, + namespace_impl=None, + namespace_properties=None, + table_id=None, + ray_remote_args=None, + max_iters=1, + tolerance=0.0, + ) + ) + + assert len(ivf_centroids) == 2 + assert len(pq_codebook) == 512 + assert num_partitions == 2 + assert num_sub_vectors == 2 + assert requested_sample_sizes == [512, 512] + + +def test_create_index_rejects_unknown_vector_training_mode(monkeypatch): + """Vector training mode should be explicit and validated before training.""" + + monkeypatch.setattr(index_mod, "_check_pylance_version", lambda: None) + monkeypatch.setattr( + index_mod, "LanceDataset", lambda *args, **kwargs: _FakeDataset() + ) + + with pytest.raises(ValueError, match="training_mode must be one of"): + index_mod.create_index( + uri="memory://fake", + column="vector", + index_type="IVF_FLAT", + training_mode="elsewhere", + ) + + +def test_distributed_training_rejects_unsupported_metric(): + """Distributed KMeans currently supports only l2 and cosine distances.""" + + with pytest.raises( + ValueError, + match="distributed vector training currently supports metric 'l2' or 'cosine'", + ): + index_mod._train_vector_index_artifacts_distributed( + dataset_uri="memory://fake", + column="vector", + index_type="IVF_FLAT", + metric="dot", + num_partitions=2, + num_sub_vectors=None, + sample_rate=4, + fragment_batches=[[0], [1]], + num_workers=2, + storage_options={}, + block_size=None, + namespace_impl=None, + namespace_properties=None, + table_id=None, + ray_remote_args=None, + ) + + +def test_vector_training_sample_uses_reservoir_sampling(monkeypatch): + """Worker-side sampling should not only use the beginning of each shard.""" + + vectors = np.arange(20, dtype=np.float32).reshape(10, 2) + vector_values = index_mod.pa.array(vectors.reshape(-1), type=index_mod.pa.float32()) + vector_array = index_mod.pa.FixedSizeListArray.from_arrays(vector_values, 2) + batches = [ + index_mod.pa.record_batch([vector_array.slice(0, 2)], names=["vector"]), + index_mod.pa.record_batch([vector_array.slice(2, 4)], names=["vector"]), + index_mod.pa.record_batch([vector_array.slice(6, 4)], names=["vector"]), + ] + + class FakeScanner: + def to_batches(self): + return iter(batches) + + class FakeDataset: + def get_fragment(self, fragment_id): + return SimpleNamespace(fragment_id=fragment_id) + + def scanner(self, columns, fragments): + return FakeScanner() + + monkeypatch.setattr( + index_mod, "LanceDataset", lambda *args, **kwargs: FakeDataset() + ) + + sample = index_mod._load_vector_training_sample( + dataset_uri="memory://fake", + column="vector", + fragment_ids=[0], + sample_size=4, + storage_options={}, + block_size=None, + namespace_impl=None, + namespace_properties=None, + table_id=None, + ) + + assert sample.shape == (4, 2) + assert sample[:, 0].max() > vectors[:4, 0].max() + + +def test_combine_kmeans_partials_keeps_empty_centroids(): + """Distributed KMeans reduce should not collapse empty clusters to zero.""" + + previous = np.asarray( + [ + [1.0, 1.0], + [10.0, 10.0], + ], + dtype=np.float32, + ) + partials = [ + { + "sums": np.asarray([[4.0, 8.0], [0.0, 0.0]], dtype=np.float64), + "counts": np.asarray([4, 0], dtype=np.int64), + "loss": 3.5, + } + ] + + centroids, loss = index_mod._combine_kmeans_partials(partials, previous) + + np.testing.assert_allclose(centroids, [[1.0, 2.0], [10.0, 10.0]]) + assert loss == 3.5 + + +def test_combine_pq_partials_keeps_empty_codewords(): + """Distributed PQ reduce should keep old codewords for empty assignments.""" + + previous = np.asarray( + [ + [[1.0, 1.0], [10.0, 10.0]], + [[2.0, 2.0], [20.0, 20.0]], + ], + dtype=np.float32, + ) + partials = [ + { + "sums": np.asarray( + [[[4.0, 8.0], [0.0, 0.0]], [[0.0, 0.0], [10.0, 20.0]]], + dtype=np.float64, + ), + "counts": np.asarray([[4, 0], [0, 2]], dtype=np.int64), + "loss": 2.0, + } + ] + + codebooks, loss = index_mod._combine_pq_partials(partials, previous) + + np.testing.assert_allclose( + codebooks, [[[1.0, 2.0], [10.0, 10.0]], [[2.0, 2.0], [5.0, 10.0]]] + ) + assert loss == 2.0 + + +def test_create_index_commits_worker_segments_by_default(monkeypatch): + """Vector index builds should directly commit worker segments by default.""" + + captured = {} + fake_dataset = _FakeDataset() + + class FakeIndicesBuilder: + dimension = 16 + + def __init__(self, dataset, column): + captured["builder_dataset"] = dataset + captured["builder_column"] = column + + def train_ivf(self, **kwargs): + captured["train_ivf"] = kwargs + return SimpleNamespace(centroids="ivf_centroids", num_partitions=4) + + def fake_handle_vector_fragment_index(**kwargs): + captured["fragment_handler_kwargs"] = kwargs + return lambda fragment_ids: {"status": "success", "fragment_ids": fragment_ids} + + def fake_map_async_with_pool(**kwargs): + captured["map_kwargs"] = kwargs + kwargs["create_fragment_handler"]() + return [ + { + "status": "success", + "fragment_ids": [0], + "segment_index": "segment_0", + }, + { + "status": "success", + "fragment_ids": [1], + "segment_index": "segment_1", + }, + ] + + monkeypatch.setattr(index_mod, "_check_pylance_version", lambda: None) + monkeypatch.setattr(index_mod, "IndicesBuilder", FakeIndicesBuilder) + monkeypatch.setattr(index_mod, "LanceDataset", lambda *args, **kwargs: fake_dataset) + monkeypatch.setattr( + index_mod, + "_handle_vector_fragment_index", + fake_handle_vector_fragment_index, + ) + monkeypatch.setattr( + index_mod, + "_put_vector_index_artifacts_in_object_store", + lambda ivf_centroids, pq_codebook: (ivf_centroids, pq_codebook), + ) + monkeypatch.setattr(index_mod, "_map_async_with_pool", fake_map_async_with_pool) + + updated_dataset = index_mod.create_index( + uri="memory://fake", + column="vector", + index_type="IVF_FLAT", + name="vector_idx", + num_workers=2, + num_partitions=4, + ) + + assert updated_dataset is fake_dataset + assert fake_dataset.segment_builder_calls == 0 + assert fake_dataset.commit_kwargs["segments"] == ["segment_0", "segment_1"] + assert "segment_native" not in captured["fragment_handler_kwargs"] + + +def test_create_index_uses_driver_finalize_when_segment_native_disabled(monkeypatch): + """Vector index builds should use driver finalize when segment-native is disabled.""" + + captured = {} + fake_dataset = _FakeDataset() + + class FakeIndicesBuilder: + dimension = 16 + + def __init__(self, dataset, column): + captured["builder_dataset"] = dataset + captured["builder_column"] = column + + def train_ivf(self, **kwargs): + captured["train_ivf"] = kwargs + return SimpleNamespace(centroids="ivf_centroids", num_partitions=4) + + def fake_handle_vector_fragment_index(**kwargs): + captured["fragment_handler_kwargs"] = kwargs + return lambda fragment_ids: {"status": "success", "fragment_ids": fragment_ids} + + def fake_map_async_with_pool(**kwargs): + captured["map_kwargs"] = kwargs + kwargs["create_fragment_handler"]() + return [ + { + "status": "success", + "fragment_ids": [0], + "segment_index": "segment_0", + }, + { + "status": "success", + "fragment_ids": [1], + "segment_index": "segment_1", + }, + ] + + monkeypatch.setattr(index_mod, "_check_pylance_version", lambda: None) + monkeypatch.setattr(index_mod, "IndicesBuilder", FakeIndicesBuilder) + monkeypatch.setattr(index_mod, "LanceDataset", lambda *args, **kwargs: fake_dataset) + monkeypatch.setattr( + index_mod, + "_handle_vector_fragment_index", + fake_handle_vector_fragment_index, + ) + monkeypatch.setattr( + index_mod, + "_put_vector_index_artifacts_in_object_store", + lambda ivf_centroids, pq_codebook: (ivf_centroids, pq_codebook), + ) + monkeypatch.setattr(index_mod, "_map_async_with_pool", fake_map_async_with_pool) + + updated_dataset = index_mod.create_index( + uri="memory://fake", + column="vector", + index_type="IVF_FLAT", + name="vector_idx", + num_workers=2, + num_partitions=4, + segment_native=False, + ) + + assert updated_dataset is fake_dataset + assert fake_dataset.segment_builder_calls == 1 + assert fake_dataset.last_segment_builder.index_type == "IVF_FLAT" + assert fake_dataset.last_segment_builder.segments == ["segment_0", "segment_1"] + assert fake_dataset.last_segment_builder.build_all_calls == 1 + assert fake_dataset.commit_kwargs["segments"] == ["merged_segment"] + assert "segment_native" not in captured["fragment_handler_kwargs"] + + +def test_create_index_filters_vector_fragments(monkeypatch): + """The vector path should distribute only requested fragment IDs.""" + + captured = {} + fake_dataset = _FakeDataset() + + class FakeIndicesBuilder: + dimension = 16 + + def __init__(self, dataset, column): + captured["builder_dataset"] = dataset + captured["builder_column"] = column + + def train_ivf(self, **kwargs): + captured["train_ivf"] = kwargs + return SimpleNamespace(centroids="ivf_centroids", num_partitions=4) + + def fake_handle_vector_fragment_index(**kwargs): + captured["fragment_handler_kwargs"] = kwargs + return lambda fragment_ids: {"status": "success", "fragment_ids": fragment_ids} + + def fake_map_async_with_pool(**kwargs): + captured["map_kwargs"] = kwargs + kwargs["create_fragment_handler"]() + return [ + { + "status": "success", + "fragment_ids": [1], + "segment_index": "segment_1", + } + ] + + monkeypatch.setattr(index_mod, "_check_pylance_version", lambda: None) + monkeypatch.setattr(index_mod, "IndicesBuilder", FakeIndicesBuilder) + monkeypatch.setattr(index_mod, "LanceDataset", lambda *args, **kwargs: fake_dataset) + monkeypatch.setattr( + index_mod, + "_handle_vector_fragment_index", + fake_handle_vector_fragment_index, + ) + monkeypatch.setattr( + index_mod, + "_put_vector_index_artifacts_in_object_store", + lambda ivf_centroids, pq_codebook: (ivf_centroids, pq_codebook), + ) + monkeypatch.setattr(index_mod, "_map_async_with_pool", fake_map_async_with_pool) + + updated_dataset = index_mod.create_index( + uri="memory://fake", + column="vector", + index_type="IVF_FLAT", + name="vector_idx", + num_workers=2, + num_partitions=4, + fragment_ids=[1], + ) + + assert updated_dataset is fake_dataset + assert captured["map_kwargs"]["fragment_batches"] == [[1]] + assert captured["map_kwargs"]["num_workers"] == 1 + assert "fragment_ids" not in captured["fragment_handler_kwargs"] + assert fake_dataset.commit_kwargs["segments"] == ["segment_1"] + + +def test_create_index_rejects_invalid_vector_fragment_ids(monkeypatch): + """The vector path should fail before training when fragment IDs are invalid.""" + + monkeypatch.setattr(index_mod, "_check_pylance_version", lambda: None) + monkeypatch.setattr( + index_mod, "LanceDataset", lambda *args, **kwargs: _FakeDataset() + ) + + with pytest.raises(ValueError, match=r"Fragment IDs \{99\} do not exist"): + index_mod.create_index( + uri="memory://fake", + column="vector", + index_type="IVF_FLAT", + fragment_ids=[99], + ) + + +def test_create_index_rejects_empty_vector_fragment_ids(monkeypatch): + """The vector path should reject empty fragment selection before training.""" + + class FailingIndicesBuilder: + def __init__(self, dataset, column): + raise AssertionError("fragment_ids should be validated before training") + + monkeypatch.setattr(index_mod, "_check_pylance_version", lambda: None) + monkeypatch.setattr( + index_mod, "LanceDataset", lambda *args, **kwargs: _FakeDataset() + ) + monkeypatch.setattr(index_mod, "IndicesBuilder", FailingIndicesBuilder) + + with pytest.raises(ValueError, match="fragment_ids cannot be empty"): + index_mod.create_index( + uri="memory://fake", + column="vector", + index_type="IVF_FLAT", + fragment_ids=[], + ) + + def test_create_index_rejects_non_positive_sample_rate(monkeypatch): """Invalid sample rates should fail before training starts.""" @@ -281,6 +1010,15 @@ def test_create_index_rejects_non_positive_sample_rate(monkeypatch): ) +def test_check_pylance_version_rejects_old_vector_api(monkeypatch): + """Vector segment APIs require the pylance version declared by the package.""" + + monkeypatch.setattr(index_mod.lance, "__version__", "0.39.106", raising=False) + + with pytest.raises(RuntimeError, match=r"pylance >= 7\.0\.0b7"): + index_mod._check_pylance_version() + + def test_create_scalar_index_passes_block_size_to_loads_and_handler(monkeypatch): """The scalar index path should use block_size whenever it loads a dataset.""" diff --git a/uv.lock b/uv.lock index 34c7e9a6..0f65bc4e 100644 --- a/uv.lock +++ b/uv.lock @@ -358,6 +358,8 @@ source = { editable = "." } dependencies = [ { name = "lance-namespace" }, { name = "more-itertools", marker = "python_full_version < '3.12'" }, + { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, + { name = "numpy", version = "2.3.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, { name = "packaging" }, { name = "pyarrow" }, { name = "pylance" }, @@ -385,6 +387,7 @@ requires-dist = [ { name = "mkdocs-awesome-pages-plugin", marker = "extra == 'docs'", specifier = ">=2.9.0" }, { name = "mkdocs-material", marker = "extra == 'docs'", specifier = ">=9.0.0" }, { name = "more-itertools", marker = "python_full_version < '3.12'", specifier = ">=2.6.0" }, + { name = "numpy", specifier = ">=1.24" }, { name = "packaging" }, { name = "pyarrow", specifier = ">=17.0.0" }, { name = "pylance", specifier = ">=7.0.0b7" },