diff --git a/docs/src/write.md b/docs/src/write.md index feb3cf87..21a5d34d 100644 --- a/docs/src/write.md +++ b/docs/src/write.md @@ -24,7 +24,7 @@ Write a Ray Dataset to Lance format. - `namespace`: LanceNamespace instance for metadata catalog integration (requires table_id) - `table_id`: Table identifier as list of strings (requires namespace) - `schema`: Optional PyArrow schema -- `mode`: Write mode - "create", "append", or "overwrite" +- `mode`: Write mode - "create" (default), "append", or "overwrite". "create" and "overwrite" write data using the incoming schema, so new columns are persisted (schema evolution); "append" adds data under the existing dataset schema without evolving it (in the default non-streaming write, columns not in the existing schema are dropped; with `stream=True` they raise a schema-mismatch error) - `target_bases`: Optional list of registered base names or base path URIs where new data files should be written. In `create` mode, entries must match `initial_bases`; in `append` and `overwrite` modes, entries must match bases already registered in the dataset manifest - `min_rows_per_file`: Minimum rows per file (default: 1024 * 1024) - `max_rows_per_file`: Maximum rows per file (default: 64 * 1024 * 1024) diff --git a/lance_ray/datasink.py b/lance_ray/datasink.py index 5921439d..c0e7f81f 100644 --- a/lance_ray/datasink.py +++ b/lance_ray/datasink.py @@ -191,7 +191,26 @@ def on_write_complete( for fragment_str, schema_str in batch: fragment = pickle.loads(fragment_str) fragments.append(fragment) - schema = pickle.loads(schema_str) + frag_schema = pickle.loads(schema_str) + # In create/overwrite mode pylance assigns field ids by column + # position, so every block must share one column order. If + # blocks differ (e.g. a union of differently ordered sources), + # committing their fragments under a single schema would read + # the data back transposed. Fail loudly instead of silently + # corrupting; an explicit ``schema`` aligns every block by name. + if ( + schema is not None + and self.mode in {"create", "overwrite"} + and frag_schema.names != schema.names + ): + raise ValueError( + "Distributed create/overwrite received blocks with " + f"inconsistent column order ({frag_schema.names} vs " + f"{schema.names}); their fragments would be committed " + "under a single schema and read back transposed. Pass " + "an explicit `schema=` so every block is aligned by name." + ) + schema = frag_schema # Check weather writer has fragments or not. # Skip commit when there are no fragments. if not schema: @@ -239,8 +258,11 @@ class LanceDatasink(_BaseLanceDatasink): schema : pyarrow.Schema, optional. The schema of the dataset. mode : str, optional - The write mode. Default is 'append'. - Choices are 'append', 'create', 'overwrite'. + The write mode. Default is 'create'. + Choices are 'create', 'append', 'overwrite'. "create" and "overwrite" + write data using the incoming schema (schema evolution), while + "append" validates against the existing dataset schema and drops + columns not present in it. min_rows_per_file : int, optional The minimum number of rows per file. Default is 1024 * 1024. max_rows_per_file : int, optional @@ -367,6 +389,7 @@ def write( blocks, self.uri, schema=self.schema, + mode=self.mode, max_rows_per_file=self.max_rows_per_file, data_storage_version=self.data_storage_version, storage_options=self.storage_options, diff --git a/lance_ray/fragment.py b/lance_ray/fragment.py index 6c60020f..ffd9bfa6 100644 --- a/lance_ray/fragment.py +++ b/lance_ray/fragment.py @@ -40,6 +40,7 @@ def write_fragment( uri: str, *, schema: Optional[pa.Schema] = None, + mode: Literal["create", "append", "overwrite"] = "append", max_rows_per_file: int = 64 * 1024 * 1024, max_bytes_per_file: Optional[int] = None, max_rows_per_group: int = 1024, # Only useful for v1 writer. @@ -55,6 +56,36 @@ def write_fragment( table_id: Optional[list[str]] = None, retry_params: Optional[dict[str, Any]] = None, ) -> list[tuple["FragmentMetadata", pa.Schema]]: + """Write a stream of blocks into one or more uncommitted Lance fragments. + + The returned fragments are not yet part of any dataset version; callers + commit them via a ``LanceOperation`` (e.g. ``Append`` or ``Overwrite``). + + Parameters + ---------- + stream : iterable of pyarrow.Table or pandas.DataFrame + The blocks of data to write. An empty stream yields no fragments. + uri : str + The dataset URI the fragments are written for. + schema : pyarrow.Schema, optional + The schema of the data. If None, it is inferred from the first block. + mode : {"create", "append", "overwrite"}, default "append" + Fragment write mode passed to pylance. "append" validates fragments + against the existing dataset schema (rejecting columns not present in + it); "create" and "overwrite" assign new field ids for the incoming schema + (schema evolution). Note the high-level ``write_lance`` defaults to + "create", while this lower-level helper defaults to "append" to preserve + append-validation semantics for direct callers. + + Returns + ------- + list of (FragmentMetadata, pyarrow.Schema) + One entry per written fragment, paired with the schema to commit. + + Notes + ----- + The remaining parameters mirror :class:`LanceFragmentWriter`. + """ from lance.dependencies import _PANDAS_AVAILABLE from lance.dependencies import pandas as pd from lance.fragment import DEFAULT_MAX_BYTES_PER_FILE, write_fragments @@ -122,6 +153,7 @@ def record_batch_converter(): reader, uri, schema=schema, + mode=mode, max_rows_per_file=max_rows_per_file, max_rows_per_group=max_rows_per_group, max_bytes_per_file=max_bytes_per_file, @@ -280,6 +312,13 @@ class LanceFragmentWriter: The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default (None) will use the 2.0 version. See the user guide for more details. + mode : {"create", "append", "overwrite"}, default "append" + Fragment write mode passed to pylance. "append" validates fragments + against the existing dataset schema (rejecting columns not present in + it); "create" and "overwrite" assign new field ids for the incoming schema + (schema evolution). Note the high-level ``write_lance`` defaults to + "create"; this writer defaults to "append" to preserve append-validation + semantics for direct callers. use_legacy_format : optional, bool, default None Deprecated method for setting the data storage version. Use the `data_storage_version` parameter instead. @@ -313,6 +352,15 @@ class LanceFragmentWriter: If provided, should contain keys like 'description', 'match', 'max_attempts', and 'max_backoff_s'. + Notes + ----- + When pairing this writer with :class:`LanceFragmentCommitter`, pass the + SAME ``mode`` to both: this writer assigns field ids (so "create"/ + "overwrite" enable schema evolution), while the committer selects the + commit operation (append vs overwrite). Using the default ``mode="append"`` + writer with a ``mode="overwrite"`` committer on an evolved schema raises a + schema-mismatch error rather than evolving the schema. + """ def __init__( @@ -325,6 +373,7 @@ def __init__( max_bytes_per_file: Optional[int] = None, max_rows_per_group: Optional[int] = None, # Only useful for v1 writer. data_storage_version: Optional[str] = None, + mode: Literal["create", "append", "overwrite"] = "append", use_legacy_format: Optional[bool] = False, storage_options: Optional[dict[str, Any]] = None, base_store_params: Optional[dict[str, dict[str, Any]]] = None, @@ -363,6 +412,7 @@ def __init__( self.max_rows_per_file = max_rows_per_file self.max_bytes_per_file = max_bytes_per_file self.data_storage_version = data_storage_version + self.mode = mode self.storage_options = storage_options self.base_store_params = base_store_params self.initial_bases = normalize_initial_bases(initial_bases) @@ -402,6 +452,7 @@ def __call__(self, batch: Union[pa.Table, "pd.DataFrame", dict]) -> pa.Table: transformed, self.uri, schema=self.schema, + mode=self.mode, max_rows_per_file=self.max_rows_per_file, max_rows_per_group=self.max_rows_per_group, max_bytes_per_file=self.max_bytes_per_file, diff --git a/lance_ray/io.py b/lance_ray/io.py index 27b3dab2..239e94c9 100644 --- a/lance_ray/io.py +++ b/lance_ray/io.py @@ -200,8 +200,18 @@ def write_lance( with namespace parameters when creating a new dataset (mode='create' or 'overwrite'). table_id: The table identifier as a list of strings. Must be provided together with namespace_impl and namespace_properties. - schema: The schema of the dataset. If not provided, it is inferred from the data. - mode: The write mode. Can be "create", "append", or "overwrite". + schema: The schema of the dataset. If not provided, it is inferred from the + data. For distributed create/overwrite writes, provide a schema when + input blocks may differ in column order (e.g. a union of differently + ordered sources); otherwise the write raises rather than risk writing + transposed data. + mode: The write mode (default "create"). Can be "create", "append", or + "overwrite". "create" writes a new dataset and "overwrite" replaces an + existing one; both write data using the schema of the incoming data, so + new columns are persisted (schema evolution). "append" adds data under + the existing dataset schema and does not evolve it: in the default + non-streaming write, columns not present in the existing schema are + dropped, while with ``stream=True`` they raise a schema-mismatch error. min_rows_per_file: The minimum number of rows per file. max_rows_per_file: The maximum number of rows per file. data_storage_version: The version of the data storage format to use. Newer versions are more @@ -341,12 +351,17 @@ def write_lance( fragment_initial_bases = ( initial_bases if mode == "create" and not first_commit_done else None ) + # Only the first committed batch carries the requested mode so that + # create/overwrite assign new field ids for the incoming schema. Every + # later batch appends to the dataset just created/overwritten above. + fragment_mode = "append" if first_commit_done else mode writer = LanceFragmentWriter( uri=dest_uri, schema=schema, # if None, writer infers from first batch (preserves Arrow metadata) max_rows_per_file=max_rows_per_file, max_rows_per_group=min_rows_per_file, # keep naming aligned with v1 semantics data_storage_version=data_storage_version, + mode=fragment_mode, storage_options=storage_options, base_store_params=base_store_params, initial_bases=fragment_initial_bases, diff --git a/lance_ray/pandas.py b/lance_ray/pandas.py index 9ddf5157..51199a2f 100644 --- a/lance_ray/pandas.py +++ b/lance_ray/pandas.py @@ -30,5 +30,12 @@ def pd_to_arrow( new_table = tbl.replace_schema_metadata(new_schema.metadata) return new_table elif isinstance(df, pa.Table) and df.num_rows > 0 and schema is not None: + # Align columns to the target schema by NAME before casting. A bare + # df.cast(schema) requires the table's field names to already match the + # schema's names in order and otherwise raises ValueError. Selecting by + # name first reorders/projects the columns so the cast lines the right + # data up with each field. + if df.schema.names != schema.names: + df = df.select(schema.names) return df.cast(schema) return df diff --git a/tests/test_basic_read_write.py b/tests/test_basic_read_write.py index ad2502a4..9681be52 100644 --- a/tests/test_basic_read_write.py +++ b/tests/test_basic_read_write.py @@ -267,6 +267,236 @@ def test_overwrite_mode(self, sample_dataset, temp_dir): overwritten_df = overwritten_dataset.to_pandas() assert len(overwritten_df) == 2 # Should have 2 rows after overwrite + def test_overwrite_mode_allows_schema_change(self, temp_dir): + """Overwrite writes fragments with the new schema, not append validation.""" + path = Path(temp_dir) / "overwrite_schema_change.lance" + original_data = pd.DataFrame({"id": [1, 2], "value": [10, 20]}) + lr.write_lance(ray.data.from_pandas(original_data), str(path)) + + ray_ds = lr.read_lance(str(path)) + updated_ds = ray_ds.map_batches( + lambda batch: batch.assign(id_2=batch["id"] * 2), + batch_format="pandas", + ) + + lr.write_lance(updated_ds, str(path), mode="overwrite") + + result = lr.read_lance(str(path)).to_pandas().sort_values("id") + assert result["id"].tolist() == [1, 2] + assert result["id_2"].tolist() == [2, 4] + + def test_stream_overwrite_mode_allows_schema_change(self, temp_dir): + """Streaming overwrite also writes fragments with the new schema.""" + path = Path(temp_dir) / "stream_overwrite_schema_change.lance" + original_data = pd.DataFrame({"id": [1, 2], "value": [10, 20]}) + lr.write_lance(ray.data.from_pandas(original_data), str(path)) + + updated_data = pd.DataFrame({"id": [3, 4], "value": [30, 40], "id_2": [6, 8]}) + lr.write_lance( + ray.data.from_pandas(updated_data), + str(path), + mode="overwrite", + stream=True, + batch_size=1, + ) + + result = lr.read_lance(str(path)).to_pandas().sort_values("id") + assert result["id"].tolist() == [3, 4] + assert result["id_2"].tolist() == [6, 8] + + def test_append_mode_uses_existing_schema(self, temp_dir): + """Append keeps the existing schema instead of evolving it.""" + path = Path(temp_dir) / "append_schema_change.lance" + original_data = pd.DataFrame({"id": [1, 2], "value": [10, 20]}) + lr.write_lance(ray.data.from_pandas(original_data), str(path)) + + additional_data = pd.DataFrame({"id": [3], "value": [30], "id_2": [6]}) + lr.write_lance( + ray.data.from_pandas(additional_data), + str(path), + mode="append", + ) + + result = lr.read_lance(str(path)).to_pandas().sort_values("id") + assert result.columns.tolist() == ["id", "value"] + assert result["id"].tolist() == [1, 2, 3] + + def test_overwrite_multi_fragment_schema_change(self, temp_dir): + """Distributed overwrite across many fragments evolves the schema and + writes complete, correct data. + + Forcing many small fragments exercises the path where independent Ray + write tasks each produce overwrite-mode fragments that are committed + together under one schema. It guards the per-fragment ``mode`` plumbing: + if overwrite fell back to append validation, the write would reject the + new ``id_2`` column. + """ + path = Path(temp_dir) / "overwrite_multi_fragment.lance" + original_data = pd.DataFrame( + {"id": list(range(12)), "value": [x * 10 for x in range(12)]} + ) + lr.write_lance(ray.data.from_pandas(original_data), str(path)) + + updated_ds = ( + lr.read_lance(str(path)) + .repartition(4) + .map_batches( + lambda batch: batch.assign(id_2=batch["id"] * 2), + batch_format="pandas", + ) + ) + lr.write_lance( + updated_ds, + str(path), + mode="overwrite", + min_rows_per_file=1, + max_rows_per_file=2, + ) + + dataset = lance.dataset(str(path)) + assert len(dataset.get_fragments()) > 1 # truly multi-fragment + assert dataset.schema.names == ["id", "value", "id_2"] + + result = lr.read_lance(str(path)).to_pandas().sort_values("id") + assert result["id"].tolist() == list(range(12)) + assert result["id_2"].tolist() == [x * 2 for x in range(12)] + assert result["id_2"].notna().all() # every fragment carries the new column + + def test_overwrite_mode_drops_column(self, temp_dir): + """Overwrite with fewer columns replaces the schema, dropping the rest.""" + path = Path(temp_dir) / "overwrite_drop_column.lance" + original_data = pd.DataFrame({"id": [1, 2], "value": [10, 20]}) + lr.write_lance(ray.data.from_pandas(original_data), str(path)) + + reduced_data = pd.DataFrame({"id": [3, 4]}) + lr.write_lance(ray.data.from_pandas(reduced_data), str(path), mode="overwrite") + + dataset = lance.dataset(str(path)) + assert dataset.schema.names == ["id"] + result = lr.read_lance(str(path)).to_pandas().sort_values("id") + assert result["id"].tolist() == [3, 4] + + def test_overwrite_mode_changes_column_type(self, temp_dir): + """Overwrite can change a column's type (int -> string).""" + path = Path(temp_dir) / "overwrite_type_change.lance" + original_data = pd.DataFrame({"id": [1, 2], "value": [10, 20]}) + lr.write_lance(ray.data.from_pandas(original_data), str(path)) + + retyped_data = pd.DataFrame({"id": [3, 4], "value": ["a", "b"]}) + lr.write_lance(ray.data.from_pandas(retyped_data), str(path), mode="overwrite") + + dataset = lance.dataset(str(path)) + value_type = dataset.schema.field("value").type + assert pa.types.is_string(value_type) or pa.types.is_large_string(value_type) + result = lr.read_lance(str(path)).to_pandas().sort_values("id") + assert result["value"].tolist() == ["a", "b"] + + def test_append_incompatible_type_raises(self, temp_dir): + """Append coerces each block to the existing dataset schema, so a + type-incompatible value fails the write. + + Append pins the existing schema (int64 ``value``) and casts each block to + it; the string value cannot be cast to int64, so the write raises. With + overwrite the same data would instead evolve ``value`` to string. + """ + path = Path(temp_dir) / "append_type_conflict.lance" + original_data = pd.DataFrame({"id": [1, 2], "value": [10, 20]}) + lr.write_lance(ray.data.from_pandas(original_data), str(path)) + + conflicting_data = pd.DataFrame({"id": [3], "value": ["not_an_int"]}) + with pytest.raises(OSError): + lr.write_lance( + ray.data.from_pandas(conflicting_data), str(path), mode="append" + ) + + def test_stream_overwrite_resume_rows_schema_change(self, temp_dir): + """Streaming overwrite still evolves the schema when resume_rows skips the + first batch: the first *committed* batch must carry the overwrite mode.""" + path = Path(temp_dir) / "stream_overwrite_resume.lance" + original_data = pd.DataFrame({"id": [1, 2], "value": [10, 20]}) + lr.write_lance(ray.data.from_pandas(original_data), str(path)) + + updated_data = pd.DataFrame( + {"id": [3, 4, 5], "value": [30, 40, 50], "id_2": [6, 8, 10]} + ) + lr.write_lance( + ray.data.from_pandas(updated_data), + str(path), + mode="overwrite", + stream=True, + batch_size=1, + resume_rows=1, + ) + + result = lr.read_lance(str(path)).to_pandas().sort_values("id") + assert result["id"].tolist() == [4, 5] # first input row skipped + assert result["id_2"].tolist() == [8, 10] # new column evolved on overwrite + + def test_stream_append_extra_column_raises(self, temp_dir): + """Streaming append rejects columns absent from the existing schema. + + Unlike the non-streaming write (which pins the existing schema and drops + such columns), the streaming path infers the schema from the batch, so + pylance's append validation raises on the unexpected column. + """ + path = Path(temp_dir) / "stream_append_extra.lance" + original_data = pd.DataFrame({"id": [1, 2], "value": [10, 20]}) + lr.write_lance(ray.data.from_pandas(original_data), str(path)) + + extra_col_data = pd.DataFrame({"id": [3], "value": [30], "id_2": [6]}) + with pytest.raises(OSError): + lr.write_lance( + ray.data.from_pandas(extra_col_data), + str(path), + mode="append", + stream=True, + batch_size=1, + ) + + def test_create_inconsistent_column_order_raises(self, temp_dir): + """Distributed create with blocks in different column orders raises + instead of silently transposing data. + + pylance assigns field ids by column position in create/overwrite mode, + so fragments written by blocks with divergent column order (e.g. a + union of differently ordered sources) cannot all match one committed + schema. The write must fail loudly rather than corrupt the data. + """ + path = Path(temp_dir) / "inconsistent_order.lance" + ds1 = ray.data.from_items([{"a": i, "b": i * 10} for i in range(6)]) + ds2 = ray.data.from_items([{"b": i * 10, "a": i} for i in range(6, 12)]) + with pytest.raises(ValueError, match="inconsistent column order"): + lr.write_lance( + ds1.union(ds2), + str(path), + mode="create", + concurrency=4, + min_rows_per_file=1, + max_rows_per_file=2, + ) + + def test_create_inconsistent_order_with_explicit_schema(self, temp_dir): + """An explicit schema aligns differently-ordered blocks by name, so the + distributed create writes correctly with no transposition.""" + path = Path(temp_dir) / "inconsistent_order_fixed.lance" + ds1 = ray.data.from_items([{"a": i, "b": i * 10} for i in range(6)]) + ds2 = ray.data.from_items([{"b": i * 10, "a": i} for i in range(6, 12)]) + schema = pa.schema([("a", pa.int64()), ("b", pa.int64())]) + lr.write_lance( + ds1.union(ds2), + str(path), + mode="create", + schema=schema, + concurrency=4, + min_rows_per_file=1, + max_rows_per_file=2, + ) + result = ( + lr.read_lance(str(path)).to_pandas().sort_values("a").reset_index(drop=True) + ) + assert len(result) == 12 + assert (result["b"] == result["a"] * 10).all() # no transposition + def test_read_lance_with_fragment_ids(self, sample_dataset, temp_dir): """Test reading with fragment IDs.""" path = Path(temp_dir) / "fragment_ids_test.lance" @@ -335,6 +565,47 @@ def test_write_and_read_with_directory_namespace(self, sample_data, temp_dir): pd.testing.assert_frame_equal(original_sorted, read_sorted) + def test_overwrite_with_directory_namespace_allows_schema_change( + self, sample_data, temp_dir + ): + """Namespace overwrite writes fragments with the new schema.""" + table_id = ["schema_change_table"] + + lr.write_lance( + ray.data.from_pandas(sample_data[["id", "score"]]), + namespace_impl="dir", + namespace_properties={"root": temp_dir}, + table_id=table_id, + ) + + ray_ds = lr.read_lance( + namespace_impl="dir", + namespace_properties={"root": temp_dir}, + table_id=table_id, + ) + updated_ds = ray_ds.map_batches( + lambda batch: batch.assign(id_2=batch["id"] * 2), + batch_format="pandas", + ) + + lr.write_lance( + updated_ds, + namespace_impl="dir", + namespace_properties={"root": temp_dir}, + table_id=table_id, + mode="overwrite", + ) + + result = lr.read_lance( + namespace_impl="dir", + namespace_properties={"root": temp_dir}, + table_id=table_id, + ).to_pandas() + result = result.sort_values("id").reset_index(drop=True) + + assert result["id"].tolist() == sample_data["id"].tolist() + assert result["id_2"].tolist() == (sample_data["id"] * 2).tolist() + class TestDatasetOptions: """Test cases for dataset options in LanceDataset.""" diff --git a/tests/test_fragment.py b/tests/test_fragment.py index 23eb5f5a..ce4eee25 100644 --- a/tests/test_fragment.py +++ b/tests/test_fragment.py @@ -272,6 +272,33 @@ def test_fragment_writer_append_mode(self, tmp_path: Path): tbl = ds.to_table() assert sorted(tbl["id"].to_pylist()) == list(range(10)) + @pytest.mark.filterwarnings("ignore::DeprecationWarning") + def test_fragment_writer_overwrite_mode_schema_change(self, tmp_path: Path): + """Writer+committer overwrite evolves the schema when the SAME mode is + passed to both (the matching mode is required; see class docs).""" + # Initial dataset with two columns. + ( + ray.data.range(4) + .map(lambda x: {"id": x["id"], "value": x["id"] * 10}) + .map_batches(LanceFragmentWriter(tmp_path)) + .write_datasink(LanceFragmentCommitter(tmp_path, mode="create")) + ) + + # Overwrite with an added column, passing mode="overwrite" to BOTH the + # writer (assigns new field ids) and the committer (Overwrite op). + ( + ray.data.range(4) + .map(lambda x: {"id": x["id"], "value": x["id"] * 10, "id_2": x["id"] * 2}) + .map_batches(LanceFragmentWriter(tmp_path, mode="overwrite")) + .write_datasink(LanceFragmentCommitter(tmp_path, mode="overwrite")) + ) + + ds = lance.dataset(tmp_path) + assert ds.schema.names == ["id", "value", "id_2"] + tbl = ds.to_table().sort_by("id") + assert tbl["id"].to_pylist() == [0, 1, 2, 3] + assert tbl["id_2"].to_pylist() == [0, 2, 4, 6] + @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_fragment_writer_empty_write(self, tmp_path: Path): """Test fragment writer with empty data."""