diff --git a/tests/test_append.py b/tests/test_append.py index 5237c2f..1027ce1 100644 --- a/tests/test_append.py +++ b/tests/test_append.py @@ -73,27 +73,54 @@ def test_append(): ) -def test_append_fail_num_variants_mismatch(): - vcz1 = make_vcz([0, 0], [1, 2], [["A", "T"], ["A", "G"]]) - vcz2 = make_vcz([0], [1], [["A", "C"]]) - - with pytest.raises( - ValueError, - match="Stores being appended must have same number of variants. " - "First has 2, second has 1", - ): - append(vcz1, vcz2) +def test_append_with_normalise(): + vcz1 = make_vcz( + variant_contig=[0, 0, 0, 0], + variant_position=[1, 2, 3, 4], + alleles=[ + ["A", "T"], + ["A", "C"], + ["A", "G"], + ["A", "C"], + ], + sample_id=["S1"], + call_genotype=[ + [[1, 0]], + [[1, 0]], + [[0, 1]], + [[1, 1]], + ], + ) + vcz2 = make_vcz( + variant_contig=[0, 0, 0], + variant_position=[1, 2, 4], + alleles=[ + ["A", "T"], + ["A", "C"], + ["A", "C"], + ], + sample_id=["S2"], + call_genotype=[ + [[0, 0]], + [[0, 1]], + [[0, 0]], + ], + ) -def test_append_fail_alleles_mismatch(): - vcz1 = make_vcz([0], [1], [["A", "T"]]) - vcz2 = make_vcz([0], [1], [["A", "C"]]) + append(vcz1, vcz2) - with pytest.raises( - ValueError, - match="Stores being appended must have same values for field 'variant_allele'", - ): - append(vcz1, vcz2) + root1 = zarr.open(vcz1) + assert_array_equal(root1["sample_id"][:], ["S1", "S2"]) + assert_array_equal( + root1["call_genotype"][:], + [ + [[1, 0], [0, 0]], + [[1, 0], [0, 1]], + [[0, 1], [-1, -1]], + [[1, 1], [0, 0]], + ], + ) def test_append_fails_for_misaligned_call_chunks(): diff --git a/tests/test_cli.py b/tests/test_cli.py index f6332fb..94df6be 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -350,6 +350,4 @@ def test_append_cli_reports_real_validation_error(tmp_path): ) assert result.exit_code == 1 - assert "Error: Stores being appended must have same number of variants" in ( - result.output - ) + assert "Error: contig_id fields must be identical" in result.output diff --git a/tests/test_vcf_roundtrip.py b/tests/test_vcf_roundtrip.py index 6906358..bc53043 100644 --- a/tests/test_vcf_roundtrip.py +++ b/tests/test_vcf_roundtrip.py @@ -23,12 +23,9 @@ # tests/data/vcf/sample-variants.vcf.gz import pytest -import zarr from vczstore.append import append -from vczstore.normalise import normalise from vczstore.remove import remove -from vczstore.utils import transaction from .utils import ( check_removed_sample, @@ -204,15 +201,12 @@ def test_normalise_and_append(tmp_path, backend_storage, zarr_format): backend_storage=backend_storage, ) vcz1 = convert_vcf_to_vcz("sample-part1.vcf.gz", tmp_path, zarr_format=zarr_format) - vcz1_norm = zarr.storage.MemoryStore() backend_storage_option = ( "" if backend_storage is None else f"--backend-storage {backend_storage}" ) - with transaction(vcz0, backend_storage=backend_storage) as store: - normalise(store, vcz1, vcz1_norm) - append(store, vcz1_norm) + append(vcz0, vcz1, backend_storage=backend_storage) # check equivalence with original VCF compare_vcf_and_vcz( diff --git a/vczstore/append.py b/vczstore/append.py index 16683fa..8f28105 100644 --- a/vczstore/append.py +++ b/vczstore/append.py @@ -1,5 +1,7 @@ import logging import os +import pathlib +import tempfile from contextlib import suppress from itertools import product @@ -9,11 +11,27 @@ from vcztools.utils import array_dims, open_zarr from zarr.core.sync import sync +from vczstore.normalise import normalise from vczstore.utils import compute_min_variants_chunk_size, copy_store, transaction logger = logging.getLogger(__name__) +def _require_normalise(root1, root2): + n_variants1 = root1["variant_contig"].shape[0] + n_variants2 = root2["variant_contig"].shape[0] + if n_variants1 != n_variants2: + return True + fields = ["contig_id", "variant_contig", "variant_position"] + if "normalise_new_alleles" not in root2["variant_allele"].attrs: + # normalise has not been called + fields.append("variant_allele") + for field in fields: + if not np.array_equal(root1[field][:], root2[field][:]): + return True + return False + + def _assert_append_arrays_compatible(name, arr1, arr2): dims1 = array_dims(arr1) dims2 = array_dims(arr2) @@ -92,13 +110,22 @@ async def copy_chunk(src_coords): ) +def temp_norm_path(prefix=None): + with tempfile.TemporaryDirectory(prefix=prefix) as tmp: + return pathlib.Path(tmp) / "vcz_norm" + + def append( vcz1, vcz2, *, + haploid_contigs=None, + allow_new_alleles=False, + variant_chunks_in_batch=None, + show_progress=False, + backend_storage=None, io_concurrency=None, require_direct_copy=False, - backend_storage=None, ): """Append vcz2 to vcz1 in place""" if io_concurrency is None: @@ -110,11 +137,6 @@ def append( root1 = open_zarr(vcz1, mode="r+", backend_storage=backend_storage) root2 = zarr.open(vcz2, mode="r") # assume local - # normalise will set the 'normalise_new_alleles' flag if there are new alleles - normalise_new_alleles = root2["variant_allele"].attrs.get( - "normalise_new_alleles", False - ) - # check preconditions sample_id1 = root1["sample_id"] sample_id2 = root2["sample_id"] @@ -122,21 +144,21 @@ def append( if common_samples.shape[0] > 0: raise ValueError(f"Duplicate samples found: {common_samples}") - n_variants1 = root1["variant_contig"].shape[0] - n_variants2 = root2["variant_contig"].shape[0] - if n_variants1 != n_variants2: - raise ValueError( - "Stores being appended must have same number of variants. " - f"First has {n_variants1}, second has {n_variants2}" + if _require_normalise(root1, root2): + # TODO: use a context manager to delete temp path after use + vcz2_norm = temp_norm_path(prefix="vczstore") + normalise( + vcz1, + vcz2, + vcz2_norm, + haploid_contigs=haploid_contigs, + allow_new_alleles=allow_new_alleles, + variant_chunks_in_batch=variant_chunks_in_batch, + show_progress=show_progress, + backend_storage=backend_storage, ) - fields = ["contig_id", "variant_contig", "variant_position"] - if not normalise_new_alleles: - fields.append("variant_allele") - for field in fields: - if not np.array_equal(root1[field][:], root2[field][:]): - raise ValueError( - f"Stores being appended must have same values for field '{field}'" - ) + vcz2 = vcz2_norm + root2 = zarr.open(vcz2, mode="r") # assume local min_chunk_size1 = compute_min_variants_chunk_size(root1) min_chunk_size2 = compute_min_variants_chunk_size(root2) @@ -223,6 +245,11 @@ def append( arr2[:, direct_count:incoming_num_samples, ...] ) + # normalise will set the 'normalise_new_alleles' flag if there are new alleles + normalise_new_alleles = root2["variant_allele"].attrs.get( + "normalise_new_alleles", False + ) + if normalise_new_alleles: logger.info("Overwriting variant_allele array since new alleles present") copy_store(vcz2, vcz1, array_keys=["variant_allele"]) diff --git a/vczstore/cli.py b/vczstore/cli.py index 8dfd047..45c3c3d 100644 --- a/vczstore/cli.py +++ b/vczstore/cli.py @@ -51,6 +51,15 @@ def call_or_error(function, *args, **kwargs): help="Zarr backend storage to use; one of 'obstore' or 'icechunk'.", ) +allow_new_alleles = click.option( + "--allow-new-alleles", + is_flag=True, + help=( + "If new alleles are found at a variant site in vcz2 the variant_allele array " + "is updated, otherwise the operation fails if not specified." + ), +) + variant_chunks_in_batch = click.option( "--variant-chunks-in-batch", type=click.IntRange(min=1), @@ -71,7 +80,10 @@ def call_or_error(function, *args, **kwargs): @click.command() @click.argument("vcz1", type=click.Path()) @click.argument("vcz2", type=click.Path()) +@allow_new_alleles +@variant_chunks_in_batch @verbose +@progress @backend_storage @io_concurrency @click.option( @@ -82,16 +94,29 @@ def call_or_error(function, *args, **kwargs): "This requires a sample chunk-aligned destination and incoming sample count." ), ) -def append(vcz1, vcz2, verbose, backend_storage, io_concurrency, require_direct_copy): +def append( + vcz1, + vcz2, + allow_new_alleles, + variant_chunks_in_batch, + verbose, + progress, + backend_storage, + io_concurrency, + require_direct_copy, +): """Append vcz2 to vcz1 in place""" setup_logging(verbose) call_or_error( append_function, vcz1, vcz2, + allow_new_alleles=allow_new_alleles, + variant_chunks_in_batch=variant_chunks_in_batch, + show_progress=progress, + backend_storage=backend_storage, io_concurrency=io_concurrency, require_direct_copy=require_direct_copy, - backend_storage=backend_storage, ) @@ -124,14 +149,7 @@ def create(vcz_out, vczs, samples_chunk_size, verbose, progress, backend_storage @click.argument("vcz1", type=click.Path()) @click.argument("vcz2", type=click.Path()) @click.argument("vcz2_norm", type=click.Path()) -@click.option( - "--allow-new-alleles", - is_flag=True, - help=( - "If new alleles are found at a variant site in vcz2 the variant_allele array " - "is updated, otherwise the operation fails if not specified." - ), -) +@allow_new_alleles @variant_chunks_in_batch @verbose @progress