Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/write.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Write a Ray Dataset to Lance format.
- `namespace`: LanceNamespace instance for metadata catalog integration (requires table_id)
- `table_id`: Table identifier as list of strings (requires namespace)
- `schema`: Optional PyArrow schema
- `mode`: Write mode - "create", "append", or "overwrite"
- `mode`: Write mode - "create" (default), "append", or "overwrite". "create" and "overwrite" write data using the incoming schema, so new columns are persisted (schema evolution); "append" adds data under the existing dataset schema without evolving it (in the default non-streaming write, columns not in the existing schema are dropped; with `stream=True` they raise a schema-mismatch error)
- `target_bases`: Optional list of registered base names or base path URIs where new data files should be written. In `create` mode, entries must match `initial_bases`; in `append` and `overwrite` modes, entries must match bases already registered in the dataset manifest
- `min_rows_per_file`: Minimum rows per file (default: 1024 * 1024)
- `max_rows_per_file`: Maximum rows per file (default: 64 * 1024 * 1024)
Expand Down
29 changes: 26 additions & 3 deletions lance_ray/datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,26 @@ def on_write_complete(
for fragment_str, schema_str in batch:
fragment = pickle.loads(fragment_str)
fragments.append(fragment)
schema = pickle.loads(schema_str)
frag_schema = pickle.loads(schema_str)
# In create/overwrite mode pylance assigns field ids by column
# position, so every block must share one column order. If
# blocks differ (e.g. a union of differently ordered sources),
# committing their fragments under a single schema would read
# the data back transposed. Fail loudly instead of silently
# corrupting; an explicit ``schema`` aligns every block by name.
if (
schema is not None
and self.mode in {"create", "overwrite"}
and frag_schema.names != schema.names
):
raise ValueError(
"Distributed create/overwrite received blocks with "
f"inconsistent column order ({frag_schema.names} vs "
f"{schema.names}); their fragments would be committed "
"under a single schema and read back transposed. Pass "
"an explicit `schema=` so every block is aligned by name."
)
schema = frag_schema
# Check weather writer has fragments or not.
# Skip commit when there are no fragments.
if not schema:
Expand Down Expand Up @@ -239,8 +258,11 @@ class LanceDatasink(_BaseLanceDatasink):
schema : pyarrow.Schema, optional.
The schema of the dataset.
mode : str, optional
The write mode. Default is 'append'.
Choices are 'append', 'create', 'overwrite'.
The write mode. Default is 'create'.
Choices are 'create', 'append', 'overwrite'. "create" and "overwrite"
write data using the incoming schema (schema evolution), while
"append" validates against the existing dataset schema and drops
columns not present in it.
min_rows_per_file : int, optional
The minimum number of rows per file. Default is 1024 * 1024.
max_rows_per_file : int, optional
Expand Down Expand Up @@ -367,6 +389,7 @@ def write(
blocks,
self.uri,
schema=self.schema,
mode=self.mode,
max_rows_per_file=self.max_rows_per_file,
data_storage_version=self.data_storage_version,
storage_options=self.storage_options,
Expand Down
51 changes: 51 additions & 0 deletions lance_ray/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def write_fragment(
uri: str,
*,
schema: Optional[pa.Schema] = None,
mode: Literal["create", "append", "overwrite"] = "append",
max_rows_per_file: int = 64 * 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
max_rows_per_group: int = 1024, # Only useful for v1 writer.
Expand All @@ -55,6 +56,36 @@ def write_fragment(
table_id: Optional[list[str]] = None,
retry_params: Optional[dict[str, Any]] = None,
) -> list[tuple["FragmentMetadata", pa.Schema]]:
"""Write a stream of blocks into one or more uncommitted Lance fragments.

The returned fragments are not yet part of any dataset version; callers
commit them via a ``LanceOperation`` (e.g. ``Append`` or ``Overwrite``).

Parameters
----------
stream : iterable of pyarrow.Table or pandas.DataFrame
The blocks of data to write. An empty stream yields no fragments.
uri : str
The dataset URI the fragments are written for.
schema : pyarrow.Schema, optional
The schema of the data. If None, it is inferred from the first block.
mode : {"create", "append", "overwrite"}, default "append"
Fragment write mode passed to pylance. "append" validates fragments
against the existing dataset schema (rejecting columns not present in
it); "create" and "overwrite" assign new field ids for the incoming schema
(schema evolution). Note the high-level ``write_lance`` defaults to
"create", while this lower-level helper defaults to "append" to preserve
append-validation semantics for direct callers.

Returns
-------
list of (FragmentMetadata, pyarrow.Schema)
One entry per written fragment, paired with the schema to commit.

Notes
-----
The remaining parameters mirror :class:`LanceFragmentWriter`.
"""
from lance.dependencies import _PANDAS_AVAILABLE
from lance.dependencies import pandas as pd
from lance.fragment import DEFAULT_MAX_BYTES_PER_FILE, write_fragments
Expand Down Expand Up @@ -122,6 +153,7 @@ def record_batch_converter():
reader,
uri,
schema=schema,
mode=mode,
max_rows_per_file=max_rows_per_file,
max_rows_per_group=max_rows_per_group,
max_bytes_per_file=max_bytes_per_file,
Expand Down Expand Up @@ -280,6 +312,13 @@ class LanceFragmentWriter:
The version of the data storage format to use. Newer versions are more
efficient but require newer versions of lance to read. The default
(None) will use the 2.0 version. See the user guide for more details.
mode : {"create", "append", "overwrite"}, default "append"
Fragment write mode passed to pylance. "append" validates fragments
against the existing dataset schema (rejecting columns not present in
it); "create" and "overwrite" assign new field ids for the incoming schema
(schema evolution). Note the high-level ``write_lance`` defaults to
"create"; this writer defaults to "append" to preserve append-validation
semantics for direct callers.
use_legacy_format : optional, bool, default None
Deprecated method for setting the data storage version. Use the
`data_storage_version` parameter instead.
Expand Down Expand Up @@ -313,6 +352,15 @@ class LanceFragmentWriter:
If provided, should contain keys like 'description', 'match',
'max_attempts', and 'max_backoff_s'.

Notes
-----
When pairing this writer with :class:`LanceFragmentCommitter`, pass the
SAME ``mode`` to both: this writer assigns field ids (so "create"/
"overwrite" enable schema evolution), while the committer selects the
commit operation (append vs overwrite). Using the default ``mode="append"``
writer with a ``mode="overwrite"`` committer on an evolved schema raises a
schema-mismatch error rather than evolving the schema.

"""

def __init__(
Expand All @@ -325,6 +373,7 @@ def __init__(
max_bytes_per_file: Optional[int] = None,
max_rows_per_group: Optional[int] = None, # Only useful for v1 writer.
data_storage_version: Optional[str] = None,
mode: Literal["create", "append", "overwrite"] = "append",
use_legacy_format: Optional[bool] = False,
storage_options: Optional[dict[str, Any]] = None,
base_store_params: Optional[dict[str, dict[str, Any]]] = None,
Expand Down Expand Up @@ -363,6 +412,7 @@ def __init__(
self.max_rows_per_file = max_rows_per_file
self.max_bytes_per_file = max_bytes_per_file
self.data_storage_version = data_storage_version
self.mode = mode
self.storage_options = storage_options
self.base_store_params = base_store_params
self.initial_bases = normalize_initial_bases(initial_bases)
Expand Down Expand Up @@ -402,6 +452,7 @@ def __call__(self, batch: Union[pa.Table, "pd.DataFrame", dict]) -> pa.Table:
transformed,
self.uri,
schema=self.schema,
mode=self.mode,
max_rows_per_file=self.max_rows_per_file,
max_rows_per_group=self.max_rows_per_group,
max_bytes_per_file=self.max_bytes_per_file,
Expand Down
19 changes: 17 additions & 2 deletions lance_ray/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,18 @@ def write_lance(
with namespace parameters when creating a new dataset (mode='create' or 'overwrite').
table_id: The table identifier as a list of strings. Must be provided together
with namespace_impl and namespace_properties.
schema: The schema of the dataset. If not provided, it is inferred from the data.
mode: The write mode. Can be "create", "append", or "overwrite".
schema: The schema of the dataset. If not provided, it is inferred from the
data. For distributed create/overwrite writes, provide a schema when
input blocks may differ in column order (e.g. a union of differently
ordered sources); otherwise the write raises rather than risk writing
transposed data.
mode: The write mode (default "create"). Can be "create", "append", or
"overwrite". "create" writes a new dataset and "overwrite" replaces an
existing one; both write data using the schema of the incoming data, so
new columns are persisted (schema evolution). "append" adds data under
the existing dataset schema and does not evolve it: in the default
non-streaming write, columns not present in the existing schema are
dropped, while with ``stream=True`` they raise a schema-mismatch error.
min_rows_per_file: The minimum number of rows per file.
max_rows_per_file: The maximum number of rows per file.
data_storage_version: The version of the data storage format to use. Newer versions are more
Expand Down Expand Up @@ -341,12 +351,17 @@ def write_lance(
fragment_initial_bases = (
initial_bases if mode == "create" and not first_commit_done else None
)
# Only the first committed batch carries the requested mode so that
# create/overwrite assign new field ids for the incoming schema. Every
# later batch appends to the dataset just created/overwritten above.
fragment_mode = "append" if first_commit_done else mode
writer = LanceFragmentWriter(
uri=dest_uri,
schema=schema, # if None, writer infers from first batch (preserves Arrow metadata)
max_rows_per_file=max_rows_per_file,
max_rows_per_group=min_rows_per_file, # keep naming aligned with v1 semantics
data_storage_version=data_storage_version,
mode=fragment_mode,
storage_options=storage_options,
base_store_params=base_store_params,
initial_bases=fragment_initial_bases,
Expand Down
7 changes: 7 additions & 0 deletions lance_ray/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,12 @@ def pd_to_arrow(
new_table = tbl.replace_schema_metadata(new_schema.metadata)
return new_table
elif isinstance(df, pa.Table) and df.num_rows > 0 and schema is not None:
# Align columns to the target schema by NAME before casting. A bare
# df.cast(schema) requires the table's field names to already match the
# schema's names in order and otherwise raises ValueError. Selecting by
# name first reorders/projects the columns so the cast lines the right
# data up with each field.
if df.schema.names != schema.names:
df = df.select(schema.names)
return df.cast(schema)
return df
Loading
Loading