diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 831e194f9f3..e96d9305ce5 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3074,6 +3074,38 @@ def _prepare_scalar_index_request( else: raise Exception("index_type must be str or IndexConfig") + @staticmethod + def _normalized_index_type( + index_type: Union[str, IndexConfig], + ) -> str: + if isinstance(index_type, IndexConfig): + index_type = index_type.index_type + return index_type.upper() + + @classmethod + def _is_segment_native_scalar_index_type( + cls, + index_type: Union[str, IndexConfig], + ) -> bool: + return cls._normalized_index_type(index_type) in { + "BTREE", + "BITMAP", + "INVERTED", + "FTS", + "ZONEMAP", + } + + @classmethod + def _requires_uncommitted_scalar_index( + cls, + index_type: Union[str, IndexConfig], + ) -> bool: + return cls._normalized_index_type(index_type) in { + "BTREE", + "BITMAP", + "ZONEMAP", + } + def create_scalar_index( self, column: str, @@ -3291,7 +3323,9 @@ def create_scalar_index( column, index_type, kwargs ) - if fragment_ids is not None and logical_index_type in {"BTREE", "BITMAP"}: + if fragment_ids is not None and self._requires_uncommitted_scalar_index( + logical_index_type + ): raise ValueError( f"{logical_index_type} distributed indexing uses " "create_index_uncommitted(..., " @@ -4004,7 +4038,8 @@ def create_index_uncommitted( Create one segment without publishing it and return its metadata. This is the public distributed-build API for vector, BTREE scalar, - canonical bitmap scalar, and INVERTED scalar index construction. Unlike + canonical bitmap scalar, INVERTED scalar, and ZONEMAP scalar index + construction. Unlike :meth:`create_index`, this method does not publish the index into the dataset manifest. Instead, it writes one segment under ``_indices//`` and returns the resulting @@ -4020,7 +4055,7 @@ def create_index_uncommitted( 4. commit the final segment list with :meth:`commit_existing_index_segments` - BTREE, BITMAP and INVERTED segments may + BTREE, BITMAP, INVERTED, and ZONEMAP segments may be merged with :meth:`merge_existing_index_segments` before commit. Parameters are the same as :meth:`create_index`, with one additional requirement: @@ -4047,12 +4082,8 @@ def create_index_uncommitted( Index Metadata for the segment that was written by this call. """ - is_scalar_segment_request = ( - isinstance(index_type, str) - and index_type.upper() in {"BTREE", "BITMAP", "INVERTED", "FTS"} - ) or ( - isinstance(index_type, IndexConfig) - and index_type.index_type.upper() in {"BTREE", "BITMAP", "INVERTED", "FTS"} + is_scalar_segment_request = self._is_segment_native_scalar_index_type( + index_type ) if is_scalar_segment_request: if fragment_ids is None: diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 191abac2b18..7ddfbbc0dc8 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -4054,6 +4054,79 @@ def test_bitmap_uncommitted_segments_can_be_committed_from_python(tmp_path): ) +def test_zonemap_fragment_ids_parameter_validation(tmp_path): + ds = generate_multi_fragment_dataset( + tmp_path, num_fragments=2, rows_per_fragment=100 + ) + + fragment_ids = [fragment.fragment_id for fragment in ds.get_fragments()] + with pytest.raises(ValueError, match="create_index_uncommitted"): + ds.create_scalar_index( + column="id", + index_type="ZONEMAP", + fragment_ids=[fragment_ids[0]], + ) + + +def test_zonemap_segment_merge_and_commit_from_python(tmp_path): + rows_per_fragment = 20_000 + ds = generate_multi_fragment_dataset( + tmp_path, num_fragments=4, rows_per_fragment=rows_per_fragment + ) + + index_name = "id_zonemap_segments" + fragment_ids = [fragment.fragment_id for fragment in ds.get_fragments()] + staged_segments = [ + ds.create_index_uncommitted( + column="id", + index_type="ZONEMAP", + name=index_name, + fragment_ids=[fragment_id], + ) + for fragment_id in fragment_ids + ] + + assert len({segment.uuid for segment in staged_segments}) == len(staged_segments) + for segment, fragment_id in zip(staged_segments, fragment_ids): + files = segment.files + assert files is not None + assert segment.fragment_ids == {fragment_id} + assert any(file.path == "zonemap.lance" for file in files) + assert all(not file.path.startswith("part_") for file in files) + + merged_segment = ds.merge_existing_index_segments(staged_segments) + merged_files = merged_segment.files + assert merged_files is not None + assert merged_segment.uuid not in {segment.uuid for segment in staged_segments} + assert merged_segment.fragment_ids == set(fragment_ids) + assert any(file.path == "zonemap.lance" for file in merged_files) + assert all(not file.path.startswith("part_") for file in merged_files) + + ds = ds.commit_existing_index_segments(index_name, "id", [merged_segment]) + descriptions = {index.name: index for index in ds.describe_indices()} + assert descriptions[index_name].index_type == "ZoneMap" + assert len(descriptions[index_name].segments) == 1 + + filter_expr = "id >= 8200 AND id < 8300" + without_index = ds.scanner( + filter=filter_expr, + columns=["id", "text"], + use_scalar_index=False, + ).to_table() + with_index = ds.scanner( + filter=filter_expr, + columns=["id", "text"], + use_scalar_index=True, + ).to_table() + + assert with_index.num_rows == without_index.num_rows + assert with_index["id"].to_pylist() == without_index["id"].to_pylist() + assert ( + "ScalarIndexQuery" + in ds.scanner(filter=filter_expr, use_scalar_index=True).explain_plan() + ) + + def test_merge_index_metadata_btree_soft_break(tmp_path): ds = generate_multi_fragment_dataset( tmp_path, num_fragments=2, rows_per_fragment=100