diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 505d20798bb..f3cc43f5f3b 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -1747,6 +1747,8 @@ def test_index_cast_centroids(tmp_path): values = pa.array([x for arr in centroids for x in arr], pa.float32()) centroids = pa.FixedSizeListArray.from_arrays(values, 128) + # Cast invalidates the attached index; drop it first per the new contract. + dataset.drop_index(index_name) dataset.alter_columns(dict(path="vector", data_type=pa.list_(pa.float16(), 128))) # centroids are f32, but the column is now f16 diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index f5d792979df..e9f55d962e6 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -8,6 +8,7 @@ use super::{ Dataset, transaction::{Operation, Transaction}, }; +use crate::index::DatasetIndexExt; use crate::{Error, Result, io::exec::Planner}; use arrow::compute::CastOptions; use arrow::compute::can_cast_types; @@ -605,6 +606,41 @@ pub(super) async fn alter_columns( new_schema.validate()?; + // If any column being cast has an attached index, fail fast. Cast operations + // rewrite the underlying column data and silently invalidate any index on the + // affected column(s). The current behavior is to drop such indices without + // warning, which has caused production incidents where vector search silently + // regressed to brute-force scan. We require users to explicitly drop the + // index before altering the column type, so the action is never silent. + if !cast_fields.is_empty() { + let indices = dataset.load_indices().await?; + let affected: Vec<&lance_table::format::IndexMetadata> = indices + .iter() + .filter(|idx| { + cast_fields + .iter() + .any(|(old, _)| idx.fields.contains(&old.id)) + }) + .collect(); + if !affected.is_empty() { + let affected_cols: Vec = cast_fields + .iter() + .filter(|(old, _)| affected.iter().any(|i| i.fields.contains(&old.id))) + .map(|(old, _)| old.name.clone()) + .collect(); + let affected_idx_names: Vec = affected.iter().map(|i| i.name.clone()).collect(); + return Err(Error::invalid_input(format!( + "Cannot cast column(s) [{}] to a new type: they have {} index(es) \ + attached: [{}]. Cast rewrites column data and invalidates any index \ + on the affected column(s). Drop the index(es) with drop_index() \ + before altering, then recreate them after the cast completes.", + affected_cols.join(", "), + affected.len(), + affected_idx_names.join(", "), + ))); + } + } + // If we aren't casting a column, we don't need to touch the fragments. let transaction = if cast_fields.is_empty() { Transaction::new( @@ -1784,7 +1820,6 @@ mod test { ) -> Result<()> { // Create a table with 2 scalar columns, 1 vector column - use crate::index::DatasetIndexExt; use arrow::datatypes::{Int32Type, Int64Type}; use arrow_array::{Float16Array, Float32Array, Int64Array, ListArray}; use half::f16; @@ -1885,7 +1920,10 @@ mod test { assert_eq!(f.files.len(), 2); }); - // Cast scalar column with index, should not keep index (TODO: keep it) + // Cast scalar column with index. The index must be dropped first; cast + // is now a fail-fast operation when an index is attached, see + // test_alter_columns_cast_fails_with_attached_index for that path. + dataset.drop_index("i_idx").await?; dataset .alter_columns(&[ColumnAlteration::new("i".into()).cast_to(DataType::Int64)]) .await?; @@ -1906,7 +1944,8 @@ mod test { ]); assert_eq!(&ArrowSchema::from(dataset.schema()), &expected_schema); - // We currently lose the index when casting a column + // The scalar index on `i` is gone (we dropped it); the vector index on + // `vec` is still present. let indices = dataset.load_indices().await?; assert_eq!(indices.len(), 1); @@ -1915,7 +1954,8 @@ mod test { assert_eq!(f.files.len(), 3); }); - // Cast vector column, should not keep index (TODO: keep it) + // Cast vector column. Drop its index first (same reason as above). + dataset.drop_index("vec_idx").await?; dataset .alter_columns(&[ ColumnAlteration::new("vec".into()).cast_to(DataType::FixedSizeList( @@ -1983,6 +2023,120 @@ mod test { Ok(()) } + /// Cast on a column with an attached index must fail fast rather than + /// silently dropping the index. This guards against the historical behavior + /// where cast would rewrite column data and the index would vanish without + /// any error or warning, causing vector search to silently regress to a + /// brute-force scan. + #[rstest] + #[tokio::test] + async fn test_alter_columns_cast_fails_with_attached_index( + #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)] + data_storage_version: LanceFileVersion, + ) -> Result<()> { + use lance_arrow::FixedSizeListArrayExt; + use lance_index::IndexType; + use lance_linalg::distance::MetricType; + use lance_testing::datagen::generate_random_array; + + use crate::index::vector::VectorIndexParams; + + // Build a small dataset with one indexed vector column. + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "vec", + DataType::FixedSizeList( + Arc::new(ArrowField::new("item", DataType::Float32, true)), + 64, + ), + false, + )])); + let nrows = 256; + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new( + ::try_new_from_values( + generate_random_array(64 * nrows as usize), + 64, + ) + .unwrap(), + )], + )?; + + let test_dir = TempStrDir::default(); + let mut dataset = Dataset::write( + RecordBatchIterator::new(vec![Ok(batch)], schema.clone()), + &test_dir, + Some(WriteParams { + data_storage_version: Some(data_storage_version), + ..Default::default() + }), + ) + .await?; + + // Build an IVF_PQ index on the vector column. + let params = VectorIndexParams::ivf_pq(4, 8, 8, MetricType::L2, 50); + dataset + .create_index(&["vec"], IndexType::Vector, None, ¶ms, false) + .await?; + + let indices_before = dataset.load_indices().await?; + assert_eq!(indices_before.len(), 1, "precondition: index exists"); + let index_name = indices_before[0].name.clone(); + + // Attempting to cast the indexed column must fail with a clear message + // that names the offending index(es). + let result = dataset + .alter_columns(&[ + ColumnAlteration::new("vec".into()).cast_to(DataType::FixedSizeList( + Arc::new(ArrowField::new("item", DataType::Float16, true)), + 64, + )), + ]) + .await; + let err = result.expect_err("cast on indexed column should fail"); + let msg = err.to_string(); + assert!( + msg.contains("vec") && msg.contains(&index_name), + "error should mention column and index name, got: {msg}" + ); + assert!( + msg.contains("drop_index"), + "error should suggest the remediation, got: {msg}" + ); + + // The dataset must be unchanged: schema is still float32, index still present. + assert_eq!( + dataset.schema().field("vec").unwrap().data_type(), + DataType::FixedSizeList( + Arc::new(ArrowField::new("item", DataType::Float32, true)), + 64, + ), + ); + let indices_after = dataset.load_indices().await?; + assert_eq!(indices_after.len(), 1, "index should still exist"); + assert_eq!(indices_after[0].name, index_name); + + // Sanity check: after dropping the index, the same cast should succeed. + dataset.drop_index(&index_name).await?; + dataset + .alter_columns(&[ + ColumnAlteration::new("vec".into()).cast_to(DataType::FixedSizeList( + Arc::new(ArrowField::new("item", DataType::Float16, true)), + 64, + )), + ]) + .await?; + assert_eq!( + dataset.schema().field("vec").unwrap().data_type(), + DataType::FixedSizeList( + Arc::new(ArrowField::new("item", DataType::Float16, true)), + 64, + ), + ); + + Ok(()) + } + #[rstest] #[tokio::test] async fn test_drop_columns(