Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,8 @@ def test_index_cast_centroids(tmp_path):
values = pa.array([x for arr in centroids for x in arr], pa.float32())
centroids = pa.FixedSizeListArray.from_arrays(values, 128)

# Cast invalidates the attached index; drop it first per the new contract.
dataset.drop_index(index_name)
dataset.alter_columns(dict(path="vector", data_type=pa.list_(pa.float16(), 128)))

# centroids are f32, but the column is now f16
Expand Down
162 changes: 158 additions & 4 deletions rust/lance/src/dataset/schema_evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::{
Dataset,
transaction::{Operation, Transaction},
};
use crate::index::DatasetIndexExt;
use crate::{Error, Result, io::exec::Planner};
use arrow::compute::CastOptions;
use arrow::compute::can_cast_types;
Expand Down Expand Up @@ -605,6 +606,41 @@ pub(super) async fn alter_columns(

new_schema.validate()?;

// If any column being cast has an attached index, fail fast. Cast operations
// rewrite the underlying column data and silently invalidate any index on the
// affected column(s). The current behavior is to drop such indices without
// warning, which has caused production incidents where vector search silently
// regressed to brute-force scan. We require users to explicitly drop the
// index before altering the column type, so the action is never silent.
if !cast_fields.is_empty() {
let indices = dataset.load_indices().await?;
let affected: Vec<&lance_table::format::IndexMetadata> = indices
.iter()
.filter(|idx| {
cast_fields
.iter()
.any(|(old, _)| idx.fields.contains(&old.id))
})
.collect();
if !affected.is_empty() {
let affected_cols: Vec<String> = cast_fields
.iter()
.filter(|(old, _)| affected.iter().any(|i| i.fields.contains(&old.id)))
.map(|(old, _)| old.name.clone())
.collect();
let affected_idx_names: Vec<String> = affected.iter().map(|i| i.name.clone()).collect();
return Err(Error::invalid_input(format!(
"Cannot cast column(s) [{}] to a new type: they have {} index(es) \
attached: [{}]. Cast rewrites column data and invalidates any index \
on the affected column(s). Drop the index(es) with drop_index() \
before altering, then recreate them after the cast completes.",
affected_cols.join(", "),
affected.len(),
affected_idx_names.join(", "),
)));
}
}

// If we aren't casting a column, we don't need to touch the fragments.
let transaction = if cast_fields.is_empty() {
Transaction::new(
Expand Down Expand Up @@ -1784,7 +1820,6 @@ mod test {
) -> Result<()> {
// Create a table with 2 scalar columns, 1 vector column

use crate::index::DatasetIndexExt;
use arrow::datatypes::{Int32Type, Int64Type};
use arrow_array::{Float16Array, Float32Array, Int64Array, ListArray};
use half::f16;
Expand Down Expand Up @@ -1885,7 +1920,10 @@ mod test {
assert_eq!(f.files.len(), 2);
});

// Cast scalar column with index, should not keep index (TODO: keep it)
// Cast scalar column with index. The index must be dropped first; cast
// is now a fail-fast operation when an index is attached, see
// test_alter_columns_cast_fails_with_attached_index for that path.
dataset.drop_index("i_idx").await?;
dataset
.alter_columns(&[ColumnAlteration::new("i".into()).cast_to(DataType::Int64)])
.await?;
Expand All @@ -1906,7 +1944,8 @@ mod test {
]);
assert_eq!(&ArrowSchema::from(dataset.schema()), &expected_schema);

// We currently lose the index when casting a column
// The scalar index on `i` is gone (we dropped it); the vector index on
// `vec` is still present.
let indices = dataset.load_indices().await?;
assert_eq!(indices.len(), 1);

Expand All @@ -1915,7 +1954,8 @@ mod test {
assert_eq!(f.files.len(), 3);
});

// Cast vector column, should not keep index (TODO: keep it)
// Cast vector column. Drop its index first (same reason as above).
dataset.drop_index("vec_idx").await?;
dataset
.alter_columns(&[
ColumnAlteration::new("vec".into()).cast_to(DataType::FixedSizeList(
Expand Down Expand Up @@ -1983,6 +2023,120 @@ mod test {
Ok(())
}

/// Cast on a column with an attached index must fail fast rather than
/// silently dropping the index. This guards against the historical behavior
/// where cast would rewrite column data and the index would vanish without
/// any error or warning, causing vector search to silently regress to a
/// brute-force scan.
#[rstest]
#[tokio::test]
async fn test_alter_columns_cast_fails_with_attached_index(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) -> Result<()> {
use lance_arrow::FixedSizeListArrayExt;
use lance_index::IndexType;
use lance_linalg::distance::MetricType;
use lance_testing::datagen::generate_random_array;

use crate::index::vector::VectorIndexParams;

// Build a small dataset with one indexed vector column.
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"vec",
DataType::FixedSizeList(
Arc::new(ArrowField::new("item", DataType::Float32, true)),
64,
),
false,
)]));
let nrows = 256;
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(
<arrow_array::FixedSizeListArray as FixedSizeListArrayExt>::try_new_from_values(
generate_random_array(64 * nrows as usize),
64,
)
.unwrap(),
)],
)?;

let test_dir = TempStrDir::default();
let mut dataset = Dataset::write(
RecordBatchIterator::new(vec![Ok(batch)], schema.clone()),
&test_dir,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
}),
)
.await?;

// Build an IVF_PQ index on the vector column.
let params = VectorIndexParams::ivf_pq(4, 8, 8, MetricType::L2, 50);
dataset
.create_index(&["vec"], IndexType::Vector, None, &params, false)
.await?;

let indices_before = dataset.load_indices().await?;
assert_eq!(indices_before.len(), 1, "precondition: index exists");
let index_name = indices_before[0].name.clone();

// Attempting to cast the indexed column must fail with a clear message
// that names the offending index(es).
let result = dataset
.alter_columns(&[
ColumnAlteration::new("vec".into()).cast_to(DataType::FixedSizeList(
Arc::new(ArrowField::new("item", DataType::Float16, true)),
64,
)),
])
.await;
let err = result.expect_err("cast on indexed column should fail");
let msg = err.to_string();
assert!(
msg.contains("vec") && msg.contains(&index_name),
"error should mention column and index name, got: {msg}"
);
assert!(
msg.contains("drop_index"),
"error should suggest the remediation, got: {msg}"
);

// The dataset must be unchanged: schema is still float32, index still present.
assert_eq!(
dataset.schema().field("vec").unwrap().data_type(),
DataType::FixedSizeList(
Arc::new(ArrowField::new("item", DataType::Float32, true)),
64,
),
);
let indices_after = dataset.load_indices().await?;
assert_eq!(indices_after.len(), 1, "index should still exist");
assert_eq!(indices_after[0].name, index_name);

// Sanity check: after dropping the index, the same cast should succeed.
dataset.drop_index(&index_name).await?;
dataset
.alter_columns(&[
ColumnAlteration::new("vec".into()).cast_to(DataType::FixedSizeList(
Arc::new(ArrowField::new("item", DataType::Float16, true)),
64,
)),
])
.await?;
assert_eq!(
dataset.schema().field("vec").unwrap().data_type(),
DataType::FixedSizeList(
Arc::new(ArrowField::new("item", DataType::Float16, true)),
64,
),
);

Ok(())
}

#[rstest]
#[tokio::test]
async fn test_drop_columns(
Expand Down
Loading