Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,56 @@ def _external_blob_table(blob_path, payload=b"hello"):
return pa.table({"blob": lance.blob_array([blob_path.as_uri()])})


def _add_columns_blob_v2_values(tmp_path):
external_base = tmp_path / "external_base"
external_blob = external_base / "external_blob.bin"
external_blob.parent.mkdir(parents=True, exist_ok=True)
external_blob.write_bytes(b"external")

payloads = [
b"inline",
b"p" * (64 * 1024 + 1024),
b"d" * (4 * 1024 * 1024 + 1024),
b"external",
]
values = [payloads[0], payloads[1], payloads[2], external_blob.as_uri()]
initial_bases = [DatasetBasePath(external_base.as_uri(), name="external", id=1)]
return values, payloads, initial_bases


def _assert_blob_v2_add_columns_result(dataset, column, payloads):
desc = dataset.to_table(columns=[column]).column(column).chunk(0)

assert desc.field("kind").to_pylist() == [0, 1, 2, 3]
assert desc.field("blob_id").to_pylist()[3] == 1
assert desc.field("blob_uri").to_pylist()[3] == "external_blob.bin"

blobs = dataset.take_blobs(column, indices=range(len(payloads)))
assert [blob.readall() for blob in blobs] == payloads


def _dataset_file_set(dataset_path):
return {
path.relative_to(dataset_path)
for path in dataset_path.rglob("*")
if path.is_file()
}


def _write_two_fragment_blob_v2_seed_dataset(tmp_path, name):
values, payloads, initial_bases = _add_columns_blob_v2_values(tmp_path)
dataset_path = tmp_path / name
ds = lance.write_dataset(
pa.table({"id": range(8)}),
dataset_path,
data_storage_version="2.2",
initial_bases=initial_bases,
max_rows_per_file=4,
max_rows_per_group=4,
)
return ds, dataset_path, values, payloads


def _out_of_order_blob_selection(dataset_with_blobs, selection_kind):
addresses = _blob_row_addresses(dataset_with_blobs)
expected = [(addresses[4], b"quux"), (addresses[0], b"foo")]
Expand Down Expand Up @@ -608,6 +658,137 @@ def test_blob_extension_write_external_ingest_rejects_reference_only_options(tmp
)


def test_blob_extension_add_columns_record_batch_reader_all_kinds(tmp_path):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we at least add a test case about fail-then-cleanup?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I'll add a fail-then-cleanup regression test so we cover the partial-write path, not just the success case.

values, payloads, initial_bases = _add_columns_blob_v2_values(tmp_path)
ds = lance.write_dataset(
pa.table({"id": range(4)}),
tmp_path / "test_add_columns_reader_blob_v2",
data_storage_version="2.2",
initial_bases=initial_bases,
)

ds.add_columns(pa.table({"blob": lance.blob_array(values)}).to_reader())

_assert_blob_v2_add_columns_result(ds, "blob", payloads)


@pytest.mark.parametrize(
"failure_mode",
[
pytest.param("raises_after_first_fragment", id="reader_raises_mid_stream"),
pytest.param("wrong_schema", id="reader_yields_wrong_schema"),
pytest.param("too_many_rows", id="reader_produces_too_many_rows"),
],
)
def test_blob_extension_add_columns_record_batch_reader_failure_cleans_files(
tmp_path,
failure_mode,
):
ds, dataset_path, values, payloads = _write_two_fragment_blob_v2_seed_dataset(
tmp_path,
f"test_add_columns_reader_blob_v2_fail_cleanup_{failure_mode}",
)
external_blob_path = tmp_path / "external_base" / "external_blob.bin"
files_before = _dataset_file_set(dataset_path)

schema = pa.schema([lance.blob_field("blob")])
first_fragment_batch = pa.record_batch([lance.blob_array(values)], schema=schema)
second_fragment_batch = pa.record_batch([lance.blob_array(values)], schema=schema)

if failure_mode == "raises_after_first_fragment":
match = "reader failed after first fragment"

def failing_reader():
yield first_fragment_batch
raise RuntimeError("reader failed after first fragment")

elif failure_mode == "wrong_schema":
match = "field names"

def failing_reader():
yield first_fragment_batch
yield pa.record_batch([pa.array(range(4))], ["not_blob"])

else:
match = "Stream produced more values than expected for dataset"

def failing_reader():
yield first_fragment_batch
yield second_fragment_batch
yield pa.record_batch([lance.blob_array([payloads[0]])], schema=schema)

with pytest.raises(OSError, match=match):
ds.add_columns(failing_reader(), reader_schema=schema)

assert ds.version == 1
assert _dataset_file_set(dataset_path) == files_before
assert external_blob_path.exists()


def test_blob_extension_add_columns_batch_udf_failure_cleans_files(tmp_path):
ds, dataset_path, values, _ = _write_two_fragment_blob_v2_seed_dataset(
tmp_path,
"test_add_columns_udf_blob_v2_fail_cleanup",
)
external_blob_path = tmp_path / "external_base" / "external_blob.bin"
files_before = _dataset_file_set(dataset_path)
call_count = 0

@lance.batch_udf(output_schema=pa.schema([lance.blob_field("blob")]))
def fail_on_second_fragment(batch):
nonlocal call_count
call_count += 1
if call_count == 2:
raise RuntimeError("udf failed after first fragment")
blob_values = [values[row.as_py() % len(values)] for row in batch["id"]]
return pa.record_batch(
[lance.blob_array(blob_values)],
["blob"],
)

with pytest.raises(OSError, match="udf failed after first fragment"):
ds.add_columns(fail_on_second_fragment, read_columns=["id"], batch_size=4)

assert call_count == 2
assert ds.version == 1
assert _dataset_file_set(dataset_path) == files_before
assert external_blob_path.exists()


def test_blob_extension_add_columns_batch_udf_all_kinds(tmp_path):
values, payloads, initial_bases = _add_columns_blob_v2_values(tmp_path)
ds = lance.write_dataset(
pa.table({"id": range(4)}),
tmp_path / "test_add_columns_udf_blob_v2",
data_storage_version="2.2",
initial_bases=initial_bases,
)

@lance.batch_udf(output_schema=pa.schema([lance.blob_field("blob")]))
def make_blob_column(batch):
return pa.record_batch(
[lance.blob_array([values[row.as_py()] for row in batch["id"]])],
["blob"],
)

ds.add_columns(make_blob_column, read_columns=["id"])

_assert_blob_v2_add_columns_result(ds, "blob", payloads)


def test_blob_extension_add_columns_all_nulls_blob_v2(tmp_path):
ds = lance.write_dataset(
pa.table({"id": range(4)}),
tmp_path / "test_add_columns_all_nulls_blob_v2",
data_storage_version="2.2",
)

ds.add_columns(lance.blob_field("blob"))

assert ds.to_table(columns=["blob"]).column("blob").to_pylist() == [None] * 4
assert ds.take_blobs("blob", indices=range(4)) == []


def test_blob_extension_write_fragments_external_denied_by_default(tmp_path):
blob_path = tmp_path / "external_blob.bin"

Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1792,7 +1792,7 @@ impl FileFragment {
read_columns: Option<Vec<String>>,
batch_size: Option<u32>,
) -> Result<(Fragment, Schema)> {
let (fragments, schema) = schema_evolution::add_columns_to_fragments(
let (fragments, schema, _) = schema_evolution::add_columns_to_fragments(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep fragments_to_cleanup? So that the outer can do some cleanup work.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think this is a broader follow-up than the current blob-v2 add_columns fix.

This PR is scoped to the dataset-level add_columns cleanup path. Keeping
fragments_to_cleanup in Fragment::add_columns would effectively extend the
fragment-level API / ownership boundary so the outer caller can decide how to
clean up on later failures.

I’d prefer to keep this PR focused and handle that as a separate follow-up PR,
unless you feel it is required to land this fix safely.

self.dataset.as_ref(),
transforms,
read_columns,
Expand Down
Loading
Loading