Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3074,6 +3074,14 @@ def _prepare_scalar_index_request(
else:
raise Exception("index_type must be str or IndexConfig")

@staticmethod
def _is_segment_native_scalar_index_type(
index_type: Union[str, IndexConfig],
) -> bool:
if isinstance(index_type, IndexConfig):
index_type = index_type.index_type
return index_type.upper() in {"BTREE", "BITMAP", "ZONEMAP"}

def create_scalar_index(
self,
column: str,
Expand Down Expand Up @@ -3291,7 +3299,9 @@ def create_scalar_index(
column, index_type, kwargs
)

if fragment_ids is not None and logical_index_type in {"BTREE", "BITMAP"}:
if fragment_ids is not None and self._is_segment_native_scalar_index_type(
logical_index_type
):
raise ValueError(
f"{logical_index_type} distributed indexing uses "
"create_index_uncommitted(..., "
Expand Down Expand Up @@ -4003,8 +4013,8 @@ def create_index_uncommitted(
"""
Create one segment without publishing it and return its metadata.

This is the public distributed-build API for vector, BTREE scalar,
and canonical bitmap scalar index construction. Unlike
This is the public distributed-build API for vector index construction
and BTREE, canonical BITMAP, and ZONEMAP scalar segment construction. Unlike
:meth:`create_index`, this method does not publish the index into the
dataset manifest. Instead, it writes one segment under
``_indices/<segment_uuid>/`` and returns the resulting
Expand Down Expand Up @@ -4049,11 +4059,8 @@ def create_index_uncommitted(
Index
Metadata for the segment that was written by this call.
"""
is_scalar_segment_request = (
isinstance(index_type, str) and index_type.upper() in {"BTREE", "BITMAP"}
) or (
isinstance(index_type, IndexConfig)
and index_type.index_type.upper() in {"BTREE", "BITMAP"}
is_scalar_segment_request = self._is_segment_native_scalar_index_type(
index_type
)
if is_scalar_segment_request:
if fragment_ids is None:
Expand Down
65 changes: 65 additions & 0 deletions python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -4008,6 +4008,71 @@ def test_bitmap_uncommitted_segments_can_be_committed_from_python(tmp_path):
)


def test_zonemap_segment_merge_and_commit_from_python(tmp_path):
ds = generate_multi_fragment_dataset(
tmp_path, num_fragments=4, rows_per_fragment=40

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the number about rows_per_fragment to be larger than 8192(e.g. > 2 * 8192), so that we can have two zones in one fragment? Because the default value of rows_per_zone is 8192.

)

index_name = "id_zonemap_segments"
fragment_ids = [fragment.fragment_id for fragment in ds.get_fragments()]
with pytest.raises(ValueError, match="create_index_uncommitted"):
ds.create_scalar_index(
column="id",
index_type="ZONEMAP",
fragment_ids=[fragment_ids[0]],
)
Comment on lines +4018 to +4023

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, it would be better to add a test case named .e.g test_zonemap_fragment_ids_parameter_validation?


staged_segments = [
ds.create_index_uncommitted(
column="id",
index_type="ZONEMAP",
name=index_name,
fragment_ids=[fragment_id],
)
for fragment_id in fragment_ids
]

assert len({segment.uuid for segment in staged_segments}) == len(staged_segments)
for segment, fragment_id in zip(staged_segments, fragment_ids):
files = segment.files
assert files is not None
assert segment.fragment_ids == {fragment_id}
assert any(file.path == "zonemap.lance" for file in files)
assert all(not file.path.startswith("part_") for file in files)

merged_segment = ds.merge_existing_index_segments(staged_segments)
merged_files = merged_segment.files
assert merged_files is not None
assert merged_segment.uuid not in {segment.uuid for segment in staged_segments}
assert merged_segment.fragment_ids == set(fragment_ids)
assert any(file.path == "zonemap.lance" for file in merged_files)
assert all(not file.path.startswith("part_") for file in merged_files)

ds = ds.commit_existing_index_segments(index_name, "id", [merged_segment])
descriptions = {index.name: index for index in ds.describe_indices()}
assert descriptions[index_name].index_type == "ZoneMap"
assert len(descriptions[index_name].segments) == 1

filter_expr = "id >= 20 AND id < 90"
without_index = ds.scanner(
filter=filter_expr,
columns=["id", "text"],
use_scalar_index=False,
).to_table()
with_index = ds.scanner(
filter=filter_expr,
columns=["id", "text"],
use_scalar_index=True,
).to_table()

assert with_index.num_rows == without_index.num_rows
assert with_index["id"].to_pylist() == without_index["id"].to_pylist()
assert (
"ScalarIndexQuery"
in ds.scanner(filter=filter_expr, use_scalar_index=True).explain_plan()
)


def test_merge_index_metadata_btree_soft_break(tmp_path):
ds = generate_multi_fragment_dataset(
tmp_path, num_fragments=2, rows_per_fragment=100
Expand Down
Loading