Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,6 @@ cython_debug/
marimo/_static/
marimo/_lsp/
__marimo__/

# CodeGraph index (local dev tooling)
.codegraph/
129 changes: 127 additions & 2 deletions docs/src/compaction.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Distributed Compaction
# Distributed Compaction and Cleanup

As Lance datasets evolve over time (e.g., frequent appends / overwrites), they can accumulate many small fragments. Compaction rewrites fragments into fewer, larger fragments to improve scan and query performance.

Lance-Ray provides a distributed compaction workflow backed by Ray workers.
Lance-Ray provides distributed maintenance workflows backed by Ray workers.
Compaction can split one table into multiple compaction tasks; cleanup runs
old-version cleanup across namespace tables.

## `compact_files`

Expand Down Expand Up @@ -66,6 +68,96 @@ This function lists tables under `database` via the namespace API and runs `comp

**Returns:** A list of dictionaries, one per table, with keys `table_id` (the full table identifier) and `metrics` (the compaction result, or `None` if no compaction was needed).

## `cleanup_old_versions`

```python
cleanup_old_versions(
uri=None,
*,
table_id=None,
older_than=None,
retain_versions=None,
delete_unverified=False,
error_if_tagged_old_versions=True,
delete_rate_limit=None,
storage_options=None,
namespace_impl=None,
namespace_properties=None,
)
```

Clean old dataset versions in a single Lance table. This delegates deletion
planning and safety checks to Lance core.

**Parameters:**

- `uri`: Dataset URI to clean (either `uri` OR `namespace_impl` + `table_id` required)
- `table_id`: Table identifier as a list of strings (requires `namespace_impl`)
- `older_than`: Optional `datetime.timedelta`; versions older than this may be removed
- `retain_versions`: Optional number of latest versions to retain
- `delete_unverified`: Delete unverified files without Lance's default (7-day) age guard. Only use this when no other process is writing to the dataset.
- `error_if_tagged_old_versions`: Raise if tagged versions match the cleanup policy (default: `True`)
- `delete_rate_limit`: Optional maximum delete operations per second
- `storage_options`: Optional storage configuration dictionary
- `namespace_impl`: Namespace implementation type (e.g., `"rest"`, `"dir"`)
- `namespace_properties`: Properties for connecting to the namespace

**Returns:** `CleanupStats`.

## `cleanup_database_old_versions`

```python
cleanup_database_old_versions(
*,
database,
namespace_impl,
namespace_properties=None,
older_than=None,
retain_versions=None,
delete_unverified=False,
error_if_tagged_old_versions=True,
delete_rate_limit=None,
num_workers=4,
storage_options=None,
ray_remote_args=None,
)
```

Clean old versions for all tables under a database (namespace). This function
lists tables under `database` via the namespace API and runs one table cleanup
task per table using a Ray Pool.

Unlike `compact_database` (which processes tables serially and fails fast on the
first error), cleanup runs tables **in parallel** and **aggregates** per-table
errors: every table is attempted, and a single error summarizing all failures is
raised only after the pool finishes. `num_workers` therefore bounds concurrency
*across tables*, not within a single table.

!!! warning

This operation is destructive and **not atomic**. Tables are cleaned eagerly
by workers, so when it raises for a failed table, other tables may already
have had old versions deleted.

**Parameters:**

- `database`: Database (namespace) identifier as a list of path segments, e.g. `['my_database']`
- `namespace_impl`: Namespace implementation type (e.g., `"rest"`, `"dir"`)
- `namespace_properties`: Properties for connecting to the namespace
- `older_than`: Optional `datetime.timedelta`; versions older than this may be removed
- `retain_versions`: Optional number of latest versions to retain
- `delete_unverified`: Delete unverified files without Lance's default (7-day) age guard. Only use this when no other process is writing to the datasets.
- `error_if_tagged_old_versions`: Raise if tagged versions match the cleanup policy (default: `True`)
- `delete_rate_limit`: Optional maximum delete operations per second per table
- `num_workers`: Number of Ray workers across tables (default: 4)
- `storage_options`: Optional storage configuration dictionary
- `ray_remote_args`: Optional kwargs for Ray remote tasks

**Returns:** A list of dictionaries, one per table, with keys `table_id` and
`stats`. `stats` is a plain dictionary with the cleanup counters returned by
Lance: `bytes_removed`, `old_versions`, `data_files_removed`,
`transaction_files_removed`, `index_files_removed`, and `deletion_files_removed`.

## Examples

### Compact a single table by URI
Expand Down Expand Up @@ -112,3 +204,36 @@ results = lr.compact_database(
for item in results:
print(item["table_id"], item["metrics"])
```

### Clean old versions for one table

```python
from datetime import timedelta
import lance_ray as lr

stats = lr.cleanup_old_versions(
uri="/path/to/table.lance",
older_than=timedelta(days=7),
retain_versions=3,
)
print(stats)
```

### Clean old versions across a database

```python
from datetime import timedelta
import lance_ray as lr

results = lr.cleanup_database_old_versions(
database=["my_db"],
namespace_impl="dir",
namespace_properties={"root": "/path/to/tables"},
older_than=timedelta(days=7),
retain_versions=3,
num_workers=4,
)

for item in results:
print(item["table_id"], item["stats"])
```
3 changes: 3 additions & 0 deletions lance_ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
__version__ = "0.4.2"
__author__ = "LanceDB Devs"
__email__ = "dev@lancedb.com"
from .cleanup import cleanup_database_old_versions, cleanup_old_versions
from .compaction import compact_database, compact_files

# Main imports
Expand Down Expand Up @@ -42,6 +43,8 @@
"optimize_indices",
"compact_files",
"compact_database",
"cleanup_old_versions",
"cleanup_database_old_versions",
"LanceFragmentWriter",
"LanceFragmentCommitter",
]
Loading
Loading