diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index 5a896d21c5d..9a0ad07f637 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -45,6 +45,56 @@ def _external_blob_table(blob_path, payload=b"hello"): return pa.table({"blob": lance.blob_array([blob_path.as_uri()])}) +def _add_columns_blob_v2_values(tmp_path): + external_base = tmp_path / "external_base" + external_blob = external_base / "external_blob.bin" + external_blob.parent.mkdir(parents=True, exist_ok=True) + external_blob.write_bytes(b"external") + + payloads = [ + b"inline", + b"p" * (64 * 1024 + 1024), + b"d" * (4 * 1024 * 1024 + 1024), + b"external", + ] + values = [payloads[0], payloads[1], payloads[2], external_blob.as_uri()] + initial_bases = [DatasetBasePath(external_base.as_uri(), name="external", id=1)] + return values, payloads, initial_bases + + +def _assert_blob_v2_add_columns_result(dataset, column, payloads): + desc = dataset.to_table(columns=[column]).column(column).chunk(0) + + assert desc.field("kind").to_pylist() == [0, 1, 2, 3] + assert desc.field("blob_id").to_pylist()[3] == 1 + assert desc.field("blob_uri").to_pylist()[3] == "external_blob.bin" + + blobs = dataset.take_blobs(column, indices=range(len(payloads))) + assert [blob.readall() for blob in blobs] == payloads + + +def _dataset_file_set(dataset_path): + return { + path.relative_to(dataset_path) + for path in dataset_path.rglob("*") + if path.is_file() + } + + +def _write_two_fragment_blob_v2_seed_dataset(tmp_path, name): + values, payloads, initial_bases = _add_columns_blob_v2_values(tmp_path) + dataset_path = tmp_path / name + ds = lance.write_dataset( + pa.table({"id": range(8)}), + dataset_path, + data_storage_version="2.2", + initial_bases=initial_bases, + max_rows_per_file=4, + max_rows_per_group=4, + ) + return ds, dataset_path, values, payloads + + def _out_of_order_blob_selection(dataset_with_blobs, selection_kind): addresses = _blob_row_addresses(dataset_with_blobs) expected = [(addresses[4], b"quux"), (addresses[0], b"foo")] @@ -608,6 +658,137 @@ def test_blob_extension_write_external_ingest_rejects_reference_only_options(tmp ) +def test_blob_extension_add_columns_record_batch_reader_all_kinds(tmp_path): + values, payloads, initial_bases = _add_columns_blob_v2_values(tmp_path) + ds = lance.write_dataset( + pa.table({"id": range(4)}), + tmp_path / "test_add_columns_reader_blob_v2", + data_storage_version="2.2", + initial_bases=initial_bases, + ) + + ds.add_columns(pa.table({"blob": lance.blob_array(values)}).to_reader()) + + _assert_blob_v2_add_columns_result(ds, "blob", payloads) + + +@pytest.mark.parametrize( + "failure_mode", + [ + pytest.param("raises_after_first_fragment", id="reader_raises_mid_stream"), + pytest.param("wrong_schema", id="reader_yields_wrong_schema"), + pytest.param("too_many_rows", id="reader_produces_too_many_rows"), + ], +) +def test_blob_extension_add_columns_record_batch_reader_failure_cleans_files( + tmp_path, + failure_mode, +): + ds, dataset_path, values, payloads = _write_two_fragment_blob_v2_seed_dataset( + tmp_path, + f"test_add_columns_reader_blob_v2_fail_cleanup_{failure_mode}", + ) + external_blob_path = tmp_path / "external_base" / "external_blob.bin" + files_before = _dataset_file_set(dataset_path) + + schema = pa.schema([lance.blob_field("blob")]) + first_fragment_batch = pa.record_batch([lance.blob_array(values)], schema=schema) + second_fragment_batch = pa.record_batch([lance.blob_array(values)], schema=schema) + + if failure_mode == "raises_after_first_fragment": + match = "reader failed after first fragment" + + def failing_reader(): + yield first_fragment_batch + raise RuntimeError("reader failed after first fragment") + + elif failure_mode == "wrong_schema": + match = "field names" + + def failing_reader(): + yield first_fragment_batch + yield pa.record_batch([pa.array(range(4))], ["not_blob"]) + + else: + match = "Stream produced more values than expected for dataset" + + def failing_reader(): + yield first_fragment_batch + yield second_fragment_batch + yield pa.record_batch([lance.blob_array([payloads[0]])], schema=schema) + + with pytest.raises(OSError, match=match): + ds.add_columns(failing_reader(), reader_schema=schema) + + assert ds.version == 1 + assert _dataset_file_set(dataset_path) == files_before + assert external_blob_path.exists() + + +def test_blob_extension_add_columns_batch_udf_failure_cleans_files(tmp_path): + ds, dataset_path, values, _ = _write_two_fragment_blob_v2_seed_dataset( + tmp_path, + "test_add_columns_udf_blob_v2_fail_cleanup", + ) + external_blob_path = tmp_path / "external_base" / "external_blob.bin" + files_before = _dataset_file_set(dataset_path) + call_count = 0 + + @lance.batch_udf(output_schema=pa.schema([lance.blob_field("blob")])) + def fail_on_second_fragment(batch): + nonlocal call_count + call_count += 1 + if call_count == 2: + raise RuntimeError("udf failed after first fragment") + blob_values = [values[row.as_py() % len(values)] for row in batch["id"]] + return pa.record_batch( + [lance.blob_array(blob_values)], + ["blob"], + ) + + with pytest.raises(OSError, match="udf failed after first fragment"): + ds.add_columns(fail_on_second_fragment, read_columns=["id"], batch_size=4) + + assert call_count == 2 + assert ds.version == 1 + assert _dataset_file_set(dataset_path) == files_before + assert external_blob_path.exists() + + +def test_blob_extension_add_columns_batch_udf_all_kinds(tmp_path): + values, payloads, initial_bases = _add_columns_blob_v2_values(tmp_path) + ds = lance.write_dataset( + pa.table({"id": range(4)}), + tmp_path / "test_add_columns_udf_blob_v2", + data_storage_version="2.2", + initial_bases=initial_bases, + ) + + @lance.batch_udf(output_schema=pa.schema([lance.blob_field("blob")])) + def make_blob_column(batch): + return pa.record_batch( + [lance.blob_array([values[row.as_py()] for row in batch["id"]])], + ["blob"], + ) + + ds.add_columns(make_blob_column, read_columns=["id"]) + + _assert_blob_v2_add_columns_result(ds, "blob", payloads) + + +def test_blob_extension_add_columns_all_nulls_blob_v2(tmp_path): + ds = lance.write_dataset( + pa.table({"id": range(4)}), + tmp_path / "test_add_columns_all_nulls_blob_v2", + data_storage_version="2.2", + ) + + ds.add_columns(lance.blob_field("blob")) + + assert ds.to_table(columns=["blob"]).column("blob").to_pylist() == [None] * 4 + assert ds.take_blobs("blob", indices=range(4)) == [] + + def test_blob_extension_write_fragments_external_denied_by_default(tmp_path): blob_path = tmp_path / "external_blob.bin" diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 11851e8846e..eb165e5f612 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1792,7 +1792,7 @@ impl FileFragment { read_columns: Option>, batch_size: Option, ) -> Result<(Fragment, Schema)> { - let (fragments, schema) = schema_evolution::add_columns_to_fragments( + let (fragments, schema, _) = schema_evolution::add_columns_to_fragments( self.dataset.as_ref(), transforms, read_columns, diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index f5d792979df..1613433c684 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -1,12 +1,16 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use super::fragment::FileFragment; use super::{ Dataset, transaction::{Operation, Transaction}, + write::cleanup_data_fragments, }; use crate::{Error, Result, io::exec::Planner}; use arrow::compute::CastOptions; @@ -239,7 +243,7 @@ pub(super) async fn add_columns_to_fragments( read_columns: Option>, fragments: &[FileFragment], batch_size: Option, -) -> Result<(Vec, Schema)> { +) -> Result<(Vec, Schema, Vec)> { // Check names early (before calling add_columns_impl) to avoid extra work if // the names are wrong. let version = dataset.manifest.data_storage_format.lance_file_version()?; @@ -261,10 +265,10 @@ pub(super) async fn add_columns_to_fragments( } let transforms = optimizer.optimize(dataset, transforms)?; - let (output_schema, fragments) = match transforms { + let (output_schema, new_fragments, fragments_to_cleanup) = match transforms { NewColumnTransform::BatchUDF(udf) => { check_names(udf.output_schema.as_ref())?; - let fragments = add_columns_impl( + let result = add_columns_impl( fragments, read_columns, udf.mapper, @@ -273,7 +277,11 @@ pub(super) async fn add_columns_to_fragments( None, ) .await?; - Result::Ok((udf.output_schema, fragments)) + Result::Ok(( + udf.output_schema, + result.fragments, + result.fragments_to_cleanup, + )) } NewColumnTransform::SqlExpressions(expressions) => { // We just transform the SQL expression into a UDF backed by DataFusion @@ -336,22 +344,22 @@ pub(super) async fn add_columns_to_fragments( let mapper = Box::new(mapper); let read_columns = Some(read_schema.field_names().into_iter().cloned().collect()); - let fragments = + let result = add_columns_impl(fragments, read_columns, mapper, batch_size, None, None).await?; - Ok((output_schema, fragments)) + Ok((output_schema, result.fragments, result.fragments_to_cleanup)) } NewColumnTransform::Stream(stream) => { let output_schema = stream.schema(); check_names(output_schema.as_ref())?; let fragments = add_columns_from_stream(fragments, stream, None, batch_size).await?; - Ok((output_schema, fragments)) + Ok((output_schema, fragments.clone(), fragments)) } NewColumnTransform::Reader(reader) => { let output_schema = reader.schema(); check_names(output_schema.as_ref())?; let stream = reader.into_stream(); let fragments = add_columns_from_stream(fragments, stream, None, batch_size).await?; - Ok((output_schema, fragments)) + Ok((output_schema, fragments.clone(), fragments)) } NewColumnTransform::AllNulls(output_schema) => { check_names(output_schema.as_ref())?; @@ -379,14 +387,20 @@ pub(super) async fn add_columns_to_fragments( )); } - Ok((output_schema, fragments)) + Ok((output_schema, fragments, Vec::new())) } }?; - let mut schema = dataset.schema().merge(output_schema.as_ref())?; + let mut schema = match dataset.schema().merge(output_schema.as_ref()) { + Ok(schema) => schema, + Err(e) => { + cleanup_new_column_data_files(fragments, &fragments_to_cleanup).await; + return Err(e); + } + }; schema.set_field_id(Some(dataset.manifest.max_field_id())); - Ok((fragments, schema)) + Ok((new_fragments, schema, fragments_to_cleanup)) } pub(super) async fn add_columns( @@ -395,7 +409,7 @@ pub(super) async fn add_columns( read_columns: Option>, batch_size: Option, ) -> Result<()> { - let (fragments, schema) = add_columns_to_fragments( + let (fragments, schema, fragments_to_cleanup) = add_columns_to_fragments( dataset, transforms, read_columns, @@ -406,11 +420,71 @@ pub(super) async fn add_columns( let operation = Operation::Merge { fragments, schema }; let transaction = Transaction::new(dataset.manifest.version, operation, None); - dataset + match dataset .apply_commit(transaction, &Default::default(), &Default::default()) - .await?; + .await + { + Ok(()) => Ok(()), + Err(e) => { + cleanup_new_column_data_files(&dataset.get_fragments(), &fragments_to_cleanup).await; + Err(e) + } + } +} - Ok(()) +async fn cleanup_new_column_data_files(fragments: &[FileFragment], new_fragments: &[Fragment]) { + let Some(first_fragment) = fragments.first() else { + return; + }; + + // add_columns rewrites fragment metadata in place, so cleanup must delete + // only files created by the current attempt and must not touch pre-existing + // files that still belong to the fragment. + let original_files_by_fragment = fragments + .iter() + .map(|fragment| { + let files = fragment + .metadata + .files + .iter() + .map(|file| (file.base_id, file.path.clone())) + .collect::>(); + (fragment.id() as u64, files) + }) + .collect::>(); + + let fragments_to_cleanup = new_fragments + .iter() + .filter_map(|fragment| { + let original_files = original_files_by_fragment.get(&fragment.id)?; + let files = fragment + .files + .iter() + .filter(|file| !original_files.contains(&(file.base_id, file.path.clone()))) + .cloned() + .collect::>(); + + if files.is_empty() { + None + } else { + let mut fragment = fragment.clone(); + fragment.files = files; + Some(fragment) + } + }) + .collect::>(); + + cleanup_data_fragments( + &first_fragment.dataset().object_store, + &first_fragment.dataset().base, + &fragments_to_cleanup, + ) + .await; +} + +struct AddColumnFragments { + fragments: Vec, + fragments_to_cleanup: Vec, } #[allow(clippy::type_complexity)] @@ -421,63 +495,96 @@ async fn add_columns_impl( batch_size: Option, result_cache: Option>, schemas: Option<(Schema, Schema)>, -) -> Result> { +) -> Result { let read_columns_ref = read_columns.as_deref(); let mapper_ref = mapper.as_ref(); - let fragments = futures::stream::iter(fragments) - .then(|fragment| { - let cache_ref = result_cache.clone(); - let schemas_ref = &schemas; - async move { - if let Some(cache) = &cache_ref { - let fragment_id = fragment.id() as u32; - let fragment = cache.get_fragment(fragment_id)?; - if let Some(fragment) = fragment { - return Ok(fragment); - } + + let mut new_fragments = Vec::with_capacity(fragments.len()); + let mut fragments_to_cleanup = Vec::with_capacity(fragments.len()); + + for fragment in fragments { + if let Some(cache) = &result_cache { + let fragment_id = fragment.id() as u32; + let fragment = match cache.get_fragment(fragment_id) { + Ok(fragment) => fragment, + Err(e) => { + cleanup_new_column_data_files(fragments, &fragments_to_cleanup).await; + return Err(e); } + }; + if let Some(fragment) = fragment { + new_fragments.push(fragment); + continue; + } + } - let mut updater = fragment - .updater(read_columns_ref, schemas_ref.clone(), batch_size) - .await?; - - let mut batch_index = 0; - // TODO: the structure of the updater prevents batch-level parallelism here, - // but there is no reason why we couldn't do this in parallel. - while let Some(batch) = updater.next().await? { - let batch_info = BatchInfo { - fragment_id: fragment.id() as u32, - batch_index, - }; + let mut updater = match fragment + .updater(read_columns_ref, schemas.clone(), batch_size) + .await + { + Ok(updater) => updater, + Err(e) => { + cleanup_new_column_data_files(fragments, &fragments_to_cleanup).await; + return Err(e); + } + }; + let fragment_result = async { + let mut batch_index = 0; + // TODO: the structure of the updater prevents batch-level parallelism here, + // but there is no reason why we couldn't do this in parallel. + while let Some(batch) = updater.next().await? { + let batch_info = BatchInfo { + fragment_id: fragment.id() as u32, + batch_index, + }; - let new_batch = if let Some(cache) = &cache_ref { - if let Some(batch) = cache.get_batch(&batch_info)? { - batch - } else { - let new_batch = mapper_ref(batch)?; - cache.insert_batch(batch_info, new_batch.clone())?; - new_batch - } + let new_batch = if let Some(cache) = &result_cache { + if let Some(batch) = cache.get_batch(&batch_info)? { + batch } else { - mapper_ref(batch)? - }; + let new_batch = mapper_ref(batch)?; + cache.insert_batch(batch_info, new_batch.clone())?; + new_batch + } + } else { + mapper_ref(batch)? + }; - updater.update(new_batch).await?; - batch_index += 1; - } + updater.update(new_batch).await?; + batch_index += 1; + } - let fragment = updater.finish().await?; + let new_fragment = updater.finish().await?; + fragments_to_cleanup.push(new_fragment.clone()); - if let Some(cache) = &cache_ref { - cache.insert_fragment(fragment.clone())?; - } + if let Some(cache) = &result_cache { + // Once the checkpoint store owns this fragment, retries may load + // it back instead of rewriting it. Removing it from the cleanup + // set avoids deleting data that has already been checkpointed. + cache.insert_fragment(new_fragment.clone())?; + fragments_to_cleanup.pop(); + } - Ok::<_, Error>(fragment) + Ok::<_, Error>(new_fragment) + } + .await; + + match fragment_result { + Ok(new_fragment) => { + new_fragments.push(new_fragment); } - }) - .try_collect::>() - .await?; - Ok(fragments) + Err(e) => { + updater.cleanup_unfinished_writer().await; + cleanup_new_column_data_files(fragments, &fragments_to_cleanup).await; + return Err(e); + } + } + } + + Ok(AddColumnFragments { + fragments: new_fragments, + fragments_to_cleanup, + }) } async fn add_columns_from_stream( @@ -489,49 +596,69 @@ async fn add_columns_from_stream( let mut new_fragments = Vec::with_capacity(fragments.len()); let mut last_seen_batch: Option = None; for fragment in fragments { - let mut updater = fragment + let mut updater = match fragment .updater::(Some(&[]), schemas.clone(), batch_size) - .await?; - while let Some(batch) = updater.next().await? { - debug_assert_eq!(batch.num_columns(), 1); - let mut rows_remaining = batch.num_rows(); + .await + { + Ok(updater) => updater, + Err(e) => { + cleanup_new_column_data_files(fragments, &new_fragments).await; + return Err(e); + } + }; + let result: Result = async { + while let Some(batch) = updater.next().await? { + debug_assert_eq!(batch.num_columns(), 1); + let mut rows_remaining = batch.num_rows(); - let mut batches = Vec::new(); + let mut batches = Vec::new(); - while rows_remaining > 0 { - let next_batch = if let Some(last_seen_batch) = last_seen_batch { - last_seen_batch - } else { - stream.next().await.ok_or_else(|| { - Error::invalid_input( - "Stream ended before producing values for all rows in dataset", - ) - })?? - }; - let num_rows = next_batch.num_rows(); - if num_rows > rows_remaining { - let new_batch = next_batch.slice(0, rows_remaining); - batches.push(new_batch); - last_seen_batch = - Some(next_batch.slice(rows_remaining, num_rows - rows_remaining)); - rows_remaining = 0; - } else { - batches.push(next_batch); - rows_remaining -= num_rows; - last_seen_batch = None; + while rows_remaining > 0 { + let next_batch = if let Some(last_seen) = last_seen_batch.take() { + last_seen + } else { + stream.next().await.ok_or_else(|| { + Error::invalid_input( + "Stream ended before producing values for all rows in dataset", + ) + })?? + }; + let num_rows = next_batch.num_rows(); + if num_rows > rows_remaining { + let new_batch = next_batch.slice(0, rows_remaining); + batches.push(new_batch); + last_seen_batch = + Some(next_batch.slice(rows_remaining, num_rows - rows_remaining)); + rows_remaining = 0; + } else { + batches.push(next_batch); + rows_remaining -= num_rows; + last_seen_batch = None; + } } - } - let new_batch = - arrow_select::concat::concat_batches(&batches[0].schema(), batches.iter())?; + let new_batch = + arrow_select::concat::concat_batches(&batches[0].schema(), batches.iter())?; - updater.update(new_batch).await?; + updater.update(new_batch).await?; + } + updater.finish().await + } + .await; + + match result { + Ok(new_fragment) => new_fragments.push(new_fragment), + Err(e) => { + updater.cleanup_unfinished_writer().await; + cleanup_new_column_data_files(fragments, &new_fragments).await; + return Err(e); + } } - new_fragments.push(updater.finish().await?); } // Ensure the stream is fully consumed if last_seen_batch.is_some() || stream.next().await.is_some() { + cleanup_new_column_data_files(fragments, &new_fragments).await; return Err(Error::invalid_input_source( "Stream produced more values than expected for dataset".into(), )); @@ -653,7 +780,7 @@ pub(super) async fn alter_columns( }; let mapper = Box::new(mapper); - let fragments = add_columns_impl( + let result = add_columns_impl( &dataset.get_fragments(), Some(read_columns), mapper, @@ -666,7 +793,8 @@ pub(super) async fn alter_columns( // Some data files may no longer contain any columns in the dataset (e.g. if every // remaining column has been altered into a different data file) and so we remove them let schema_field_ids = new_schema.field_ids().into_iter().collect::>(); - let fragments = fragments + let fragments = result + .fragments .into_iter() .map(|mut frag| { frag.files.retain(|f| { @@ -762,8 +890,7 @@ pub fn exclude(source: &Schema, other: &Schema, version: &LanceFileVersion) -> R #[cfg(test)] mod test { - use std::collections::HashMap; - use std::sync::Mutex; + use std::{collections::HashMap, fs, num::NonZero, path::Path as StdPath, sync::Mutex}; use crate::dataset::WriteParams; use arrow_array::{ @@ -774,6 +901,7 @@ mod test { use arrow_schema::Fields as ArrowFields; use lance_core::utils::tempfile::TempStrDir; use lance_file::version::LanceFileVersion; + use lance_table::format::{BasePath, DataFile}; use rstest::rstest; // Used to validate that futures returned are Send. @@ -781,6 +909,47 @@ mod test { t } + fn file_paths_in(dir: impl AsRef) -> Vec { + fn collect_files( + base_dir: &StdPath, + dir: &StdPath, + files: &mut Vec, + ) -> std::io::Result<()> { + if !dir.exists() { + return Ok(()); + } + for entry in std::fs::read_dir(dir)? { + let path = entry?.path(); + if path.is_dir() { + collect_files(base_dir, &path, files)?; + } else if path.is_file() + && path + .file_name() + .and_then(|name| name.to_str()) + .is_some_and(|file_name| !file_name.starts_with('.')) + { + files.push( + path.strip_prefix(base_dir) + .unwrap() + .to_string_lossy() + .to_string(), + ); + } + } + Ok(()) + } + + let base_dir = dir.as_ref(); + let mut files = Vec::new(); + collect_files(base_dir, base_dir, &mut files).unwrap(); + files.sort(); + files + } + + fn data_file_paths_in(base_dir: &str) -> Vec { + file_paths_in(StdPath::new(base_dir).join("data")) + } + #[tokio::test] async fn test_append_columns_exprs() -> Result<()> { let num_rows = 5; @@ -864,6 +1033,623 @@ mod test { Ok(()) } + #[rstest] + #[tokio::test] + async fn test_add_columns_cleans_up_blob_v2_data_on_stream_error( + #[values( + ("inline", b"inline".to_vec()), + ("packed", vec![1u8; 128 * 1024]), + ("dedicated", vec![2u8; 5 * 1024 * 1024]), + ("external", b"external".to_vec()) + )] + blob_case: (&str, Vec), + ) -> Result<()> { + let (blob_kind, payload) = blob_case; + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..1))], + )?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + let external_dir = tempfile::tempdir()?; + let external_path = external_dir.path().join("blob.bin"); + fs::write(&external_path, &payload)?; + let external_baseline_files = file_paths_in(external_dir.path()); + let external_baseline_payload = fs::read(&external_path)?; + + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + initial_bases: Some(vec![BasePath::new( + 1, + external_dir.path().to_string_lossy().to_string(), + Some("external".to_string()), + false, + )]), + ..Default::default() + }), + ) + .await?; + let baseline_files = data_file_paths_in(test_uri); + + let mut blob_builder = crate::BlobArrayBuilder::new(2); + if blob_kind == "external" { + blob_builder.push_uri(external_path.to_string_lossy())?; + } else { + blob_builder.push_bytes(payload)?; + } + blob_builder.push_bytes(b"extra")?; + let blob_array = blob_builder.finish()?; + let blob_schema = Arc::new(ArrowSchema::new(vec![crate::blob_field("blob", true)])); + let blob_batch = RecordBatch::try_new(blob_schema.clone(), vec![blob_array])?; + let reader = RecordBatchIterator::new(vec![Ok(blob_batch)], blob_schema); + + let err = dataset + .add_columns(NewColumnTransform::Reader(Box::new(reader)), None, None) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("Stream produced more values than expected for dataset") + ); + + assert_eq!( + data_file_paths_in(test_uri), + baseline_files, + "add_columns should clean up new data files and blob v2 sidecars on failure" + ); + assert_eq!( + file_paths_in(external_dir.path()), + external_baseline_files, + "cleanup must not delete external files" + ); + assert_eq!( + fs::read(&external_path)?, + external_baseline_payload, + "cleanup must not modify external files" + ); + dataset.validate().await?; + + Ok(()) + } + + #[tokio::test] + async fn test_cleanup_preserves_checkpointed_fragment_files() -> Result<()> { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..2))], + )?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 1, + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await?; + let original_fragments = dataset.get_fragments(); + assert_eq!(original_fragments.len(), 2); + + let data_dir = StdPath::new(test_uri).join("data"); + let cached_file = data_dir.join("checkpointed.lance"); + let cached_blob_dir = data_dir.join("checkpointed"); + fs::write(&cached_file, b"checkpointed data")?; + fs::create_dir_all(&cached_blob_dir)?; + fs::write( + cached_blob_dir.join("00000000000000000000000000000001.blob"), + b"blob", + )?; + + let mut checkpointed_fragment = original_fragments[0].metadata().clone(); + checkpointed_fragment.files.push(DataFile::new( + "checkpointed.lance", + vec![dataset.manifest.max_field_id() + 1], + vec![0], + 2, + 2, + NonZero::new(17), + None, + )); + + #[derive(Default)] + struct CheckpointedFragmentStore { + fragment: Mutex>, + } + + impl UDFCheckpointStore for CheckpointedFragmentStore { + fn get_batch(&self, _info: &BatchInfo) -> Result> { + Ok(None) + } + + fn insert_batch(&self, _info: BatchInfo, _batch: RecordBatch) -> Result<()> { + Ok(()) + } + + fn get_fragment(&self, fragment_id: u32) -> Result> { + if fragment_id == 0 { + Ok(self.fragment.lock().unwrap().clone()) + } else { + Ok(None) + } + } + + fn insert_fragment(&self, _fragment: Fragment) -> Result<()> { + Ok(()) + } + } + + let transforms = NewColumnTransform::BatchUDF(BatchUDF { + mapper: Box::new(|_| Err(Error::invalid_input("injected UDF failure"))), + output_schema: Arc::new(ArrowSchema::new(vec![ArrowField::new( + "checkpointed", + DataType::Int32, + true, + )])), + result_checkpoint: Some(Arc::new(CheckpointedFragmentStore { + fragment: Mutex::new(Some(checkpointed_fragment)), + })), + }); + + let err = dataset + .add_columns(transforms, None, None) + .await + .unwrap_err(); + assert!(err.to_string().contains("injected UDF failure")); + + assert!( + cached_file.exists(), + "cleanup must not delete fragment files restored from a checkpoint" + ); + assert!( + cached_blob_dir.exists(), + "cleanup must not delete blob sidecars restored from a checkpoint" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_add_columns_cleans_current_blob_v2_writer_on_udf_error() -> Result<()> { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..2))], + )?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await?; + let baseline_files = data_file_paths_in(test_uri); + + let call_count = Arc::new(Mutex::new(0usize)); + let mapper_call_count = call_count.clone(); + let output_schema = Arc::new(ArrowSchema::new(vec![crate::blob_field("blob", true)])); + let mapper = move |batch: &RecordBatch| { + let mut call_count = mapper_call_count.lock().unwrap(); + *call_count += 1; + if *call_count == 2 { + return Err(Error::invalid_input("injected UDF failure")); + } + + let mut blob_builder = crate::BlobArrayBuilder::new(batch.num_rows()); + for _ in 0..batch.num_rows() { + blob_builder.push_bytes(vec![7u8; 5 * 1024 * 1024])?; + } + Ok(RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![crate::blob_field("blob", true)])), + vec![blob_builder.finish()?], + )?) + }; + let transforms = NewColumnTransform::BatchUDF(BatchUDF { + mapper: Box::new(mapper), + output_schema, + result_checkpoint: None, + }); + + let err = dataset + .add_columns(transforms, None, Some(1)) + .await + .unwrap_err(); + assert!(err.to_string().contains("injected UDF failure")); + assert_eq!( + data_file_paths_in(test_uri), + baseline_files, + "add_columns should clean files written by the current unfinished writer" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_add_columns_preserves_checkpointed_blob_v2_fragment_on_checkpoint_lookup_error() + -> Result<()> { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..2))], + )?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 1, + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await?; + + struct FailingLookupStore { + inserted: Arc>>, + } + + impl UDFCheckpointStore for FailingLookupStore { + fn get_batch(&self, _info: &BatchInfo) -> Result> { + Ok(None) + } + + fn insert_batch(&self, _info: BatchInfo, _batch: RecordBatch) -> Result<()> { + Ok(()) + } + + fn get_fragment(&self, fragment_id: u32) -> Result> { + if fragment_id == 1 { + Err(Error::invalid_input("injected checkpoint lookup failure")) + } else { + Ok(None) + } + } + + fn insert_fragment(&self, fragment: Fragment) -> Result<()> { + *self.inserted.lock().unwrap() = Some(fragment); + Ok(()) + } + } + + let inserted = Arc::new(Mutex::new(None)); + let output_schema = Arc::new(ArrowSchema::new(vec![crate::blob_field("blob", true)])); + let mapper = move |batch: &RecordBatch| { + let mut blob_builder = crate::BlobArrayBuilder::new(batch.num_rows()); + for _ in 0..batch.num_rows() { + blob_builder.push_bytes(vec![7u8; 5 * 1024 * 1024])?; + } + Ok(RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![crate::blob_field("blob", true)])), + vec![blob_builder.finish()?], + )?) + }; + let transforms = NewColumnTransform::BatchUDF(BatchUDF { + mapper: Box::new(mapper), + output_schema, + result_checkpoint: Some(Arc::new(FailingLookupStore { + inserted: inserted.clone(), + })), + }); + + let err = dataset + .add_columns(transforms, None, None) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("injected checkpoint lookup failure") + ); + let inserted = inserted.lock().unwrap().clone().unwrap(); + let new_file = inserted + .files + .iter() + .find(|file| { + file.fields + .iter() + .any(|field| *field > dataset.manifest.max_field_id()) + }) + .expect("checkpoint should record the newly written data file"); + let new_file_path = StdPath::new(test_uri).join("data").join(&new_file.path); + let new_blob_dir = StdPath::new(test_uri) + .join("data") + .join(StdPath::new(&new_file.path).file_stem().unwrap()); + assert!( + new_file_path.exists(), + "cleanup must not delete data files after checkpoint takes ownership" + ); + assert!( + new_blob_dir.exists(), + "cleanup must not delete blob sidecars after checkpoint takes ownership" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_add_columns_cleans_finished_blob_v2_writer_on_checkpoint_insert_error() + -> Result<()> { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..1))], + )?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await?; + let baseline_files = data_file_paths_in(test_uri); + + struct FailingInsertStore; + + impl UDFCheckpointStore for FailingInsertStore { + fn get_batch(&self, _info: &BatchInfo) -> Result> { + Ok(None) + } + + fn insert_batch(&self, _info: BatchInfo, _batch: RecordBatch) -> Result<()> { + Ok(()) + } + + fn get_fragment(&self, _fragment_id: u32) -> Result> { + Ok(None) + } + + fn insert_fragment(&self, _fragment: Fragment) -> Result<()> { + Err(Error::invalid_input("injected checkpoint insert failure")) + } + } + + let output_schema = Arc::new(ArrowSchema::new(vec![crate::blob_field("blob", true)])); + let mapper = move |batch: &RecordBatch| { + let mut blob_builder = crate::BlobArrayBuilder::new(batch.num_rows()); + for _ in 0..batch.num_rows() { + blob_builder.push_bytes(vec![7u8; 5 * 1024 * 1024])?; + } + Ok(RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![crate::blob_field("blob", true)])), + vec![blob_builder.finish()?], + )?) + }; + let transforms = NewColumnTransform::BatchUDF(BatchUDF { + mapper: Box::new(mapper), + output_schema, + result_checkpoint: Some(Arc::new(FailingInsertStore)), + }); + + let err = dataset + .add_columns(transforms, None, None) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("injected checkpoint insert failure") + ); + assert_eq!( + data_file_paths_in(test_uri), + baseline_files, + "add_columns should clean finished writer files when checkpoint insert fails" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_add_columns_cleans_blob_v2_files_on_declared_schema_merge_error() -> Result<()> { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..1))], + )?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await?; + let baseline_files = data_file_paths_in(test_uri); + + let mapper = move |batch: &RecordBatch| { + let mut blob_builder = crate::BlobArrayBuilder::new(batch.num_rows()); + for _ in 0..batch.num_rows() { + blob_builder.push_bytes(vec![7u8; 5 * 1024 * 1024])?; + } + Ok(RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![crate::blob_field("blob", true)])), + vec![blob_builder.finish()?], + )?) + }; + let transforms = NewColumnTransform::BatchUDF(BatchUDF { + mapper: Box::new(mapper), + output_schema: Arc::new(ArrowSchema::new(vec![ + ArrowField::new("declared", DataType::Int32, true), + ArrowField::new("declared", DataType::Int32, true), + ])), + result_checkpoint: None, + }); + + let err = dataset + .add_columns(transforms, None, None) + .await + .unwrap_err(); + assert!(matches!(err, Error::Schema { .. })); + assert_eq!( + data_file_paths_in(test_uri), + baseline_files, + "add_columns should clean files written before declared schema merge fails" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_add_columns_preserves_checkpointed_blob_v2_fragment_after_later_failure() + -> Result<()> { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..2))], + )?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 1, + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await?; + + struct InsertThenFailStore { + inserted: Arc>>, + } + + impl UDFCheckpointStore for InsertThenFailStore { + fn get_batch(&self, info: &BatchInfo) -> Result> { + if info.fragment_id == 1 { + Err(Error::invalid_input("injected later checkpoint failure")) + } else { + Ok(None) + } + } + + fn insert_batch(&self, _info: BatchInfo, _batch: RecordBatch) -> Result<()> { + Ok(()) + } + + fn get_fragment(&self, _fragment_id: u32) -> Result> { + Ok(None) + } + + fn insert_fragment(&self, fragment: Fragment) -> Result<()> { + *self.inserted.lock().unwrap() = Some(fragment); + Ok(()) + } + } + + let inserted = Arc::new(Mutex::new(None)); + let output_schema = Arc::new(ArrowSchema::new(vec![crate::blob_field("blob", true)])); + let mapper = move |batch: &RecordBatch| { + let mut blob_builder = crate::BlobArrayBuilder::new(batch.num_rows()); + for _ in 0..batch.num_rows() { + blob_builder.push_bytes(vec![7u8; 5 * 1024 * 1024])?; + } + Ok(RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![crate::blob_field("blob", true)])), + vec![blob_builder.finish()?], + )?) + }; + let transforms = NewColumnTransform::BatchUDF(BatchUDF { + mapper: Box::new(mapper), + output_schema, + result_checkpoint: Some(Arc::new(InsertThenFailStore { + inserted: inserted.clone(), + })), + }); + + let err = dataset + .add_columns(transforms, None, None) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("injected later checkpoint failure") + ); + + let inserted = inserted.lock().unwrap().clone().unwrap(); + let new_file = inserted + .files + .iter() + .find(|file| { + file.fields + .iter() + .any(|field| *field > dataset.manifest.max_field_id()) + }) + .expect("checkpoint should record the newly written data file"); + let new_file_path = StdPath::new(test_uri).join("data").join(&new_file.path); + let new_blob_dir = StdPath::new(test_uri) + .join("data") + .join(StdPath::new(&new_file.path).file_stem().unwrap()); + assert!( + new_file_path.exists(), + "cleanup must not delete data files after checkpoint takes ownership" + ); + assert!( + new_blob_dir.exists(), + "cleanup must not delete blob sidecars after checkpoint takes ownership" + ); + + Ok(()) + } + #[rstest] #[tokio::test] async fn test_append_columns_udf( diff --git a/rust/lance/src/dataset/updater.rs b/rust/lance/src/dataset/updater.rs index b9bc34f8706..a92d97e54e9 100644 --- a/rust/lance/src/dataset/updater.rs +++ b/rust/lance/src/dataset/updater.rs @@ -6,13 +6,13 @@ use futures::StreamExt; use lance_core::datatypes::{OnMissing, OnTypeMismatch}; use lance_core::utils::deletion::DeletionVector; use lance_core::{Error, Result, datatypes::Schema}; -use lance_table::format::Fragment; +use lance_table::format::{DataFile, Fragment}; use lance_table::utils::stream::ReadBatchFutStream; use super::Dataset; use super::fragment::FragmentReader; use super::scanner::get_default_batch_size; -use super::write::{GenericWriter, open_writer}; +use super::write::{GenericWriter, cleanup_data_fragments, open_update_writer}; use crate::dataset::FileFragment; use crate::dataset::utils::SchemaAdapter; @@ -146,13 +146,7 @@ impl Updater { .data_storage_format .lance_file_version()?; - open_writer( - &self.fragment.dataset().object_store, - &schema, - &self.fragment.dataset().base, - data_storage_version, - ) - .await + open_update_writer(self.dataset(), &schema, data_storage_version).await } /// Update one batch. @@ -221,6 +215,37 @@ impl Updater { Ok(self.fragment.metadata().clone()) } + /// Clean up any data file and blob sidecars created by the current unfinished writer. + pub(super) async fn cleanup_unfinished_writer(&mut self) { + let Some(writer) = self.writer.as_ref() else { + return; + }; + let (path, base_id) = writer.data_file_path(); + if path.is_empty() { + return; + } + + let mut fragment = Fragment::new(self.fragment.id() as u64); + // cleanup_data_fragments only needs path/base_id to remove the unfinished + // data file and any blob sidecars. Build a minimal synthetic fragment so + // we can reuse the shared cleanup path without fabricating full metadata. + fragment.files.push(DataFile::new( + path.to_string(), + vec![], + vec![], + 0, + 0, + None, + base_id, + )); + cleanup_data_fragments( + &self.dataset().object_store, + &self.dataset().base, + &[fragment], + ) + .await; + } + /// Get the final schema of the fragment after the update. /// /// This may be None if the schema is not known. This can happen if it was diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 1e73618fc6b..b42b1f1cba9 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -1021,6 +1021,8 @@ pub async fn write_fragments_internal( pub trait GenericWriter: Send { /// Write the given batches to the file async fn write(&mut self, batches: &[RecordBatch]) -> Result<()>; + /// Get the file path and base ID for the data file being written. + fn data_file_path(&self) -> (&str, Option); /// Get the current position in the file /// /// We use this to know when the file is too large and we need to start @@ -1047,6 +1049,9 @@ where async fn write(&mut self, batches: &[RecordBatch]) -> Result<()> { self.writer.write(batches).await } + fn data_file_path(&self) -> (&str, Option) { + (&self.path, self.base_id) + } async fn tell(&mut self) -> Result { Ok(self.writer.tell().await? as u64) } @@ -1087,6 +1092,9 @@ impl GenericWriter for V2WriterAdapter { } Ok(()) } + fn data_file_path(&self) -> (&str, Option) { + (&self.path, self.base_id) + } async fn tell(&mut self) -> Result { Ok(self.writer.tell().await?) } @@ -1140,6 +1148,39 @@ pub async fn open_writer( .await } +pub(super) async fn open_update_writer( + dataset: &Dataset, + schema: &Schema, + storage_version: LanceFileVersion, +) -> Result> { + // add_columns / alter_columns reuse the normal writer stack, but they do not + // flow through WriteParams. Rebuild the external base resolver here so blob + // v2 reference columns can resolve dataset-registered external URIs. + let external_base_resolver = if storage_version >= LanceFileVersion::V2_2 + && schema.fields.iter().any(|f| f.is_blob_v2()) + { + Some(Arc::new( + build_external_base_resolver(Some(dataset), &WriteParams::default()).await?, + )) + } else { + None + }; + + open_writer_with_options( + &dataset.object_store, + schema, + &dataset.base, + storage_version, + WriterOptions { + add_data_dir: true, + external_base_resolver, + source_store_registry: dataset.session.store_registry(), + ..Default::default() + }, + ) + .await +} + #[derive(Default)] struct WriterOptions { add_data_dir: bool,