Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 45 additions & 18 deletions tests/test_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,54 @@ def test_append():
)


def test_append_fail_num_variants_mismatch():
vcz1 = make_vcz([0, 0], [1, 2], [["A", "T"], ["A", "G"]])
vcz2 = make_vcz([0], [1], [["A", "C"]])

with pytest.raises(
ValueError,
match="Stores being appended must have same number of variants. "
"First has 2, second has 1",
):
append(vcz1, vcz2)
def test_append_with_normalise():
vcz1 = make_vcz(
variant_contig=[0, 0, 0, 0],
variant_position=[1, 2, 3, 4],
alleles=[
["A", "T"],
["A", "C"],
["A", "G"],
["A", "C"],
],
sample_id=["S1"],
call_genotype=[
[[1, 0]],
[[1, 0]],
[[0, 1]],
[[1, 1]],
],
)

vcz2 = make_vcz(
variant_contig=[0, 0, 0],
variant_position=[1, 2, 4],
alleles=[
["A", "T"],
["A", "C"],
["A", "C"],
],
sample_id=["S2"],
call_genotype=[
[[0, 0]],
[[0, 1]],
[[0, 0]],
],
)

def test_append_fail_alleles_mismatch():
vcz1 = make_vcz([0], [1], [["A", "T"]])
vcz2 = make_vcz([0], [1], [["A", "C"]])
append(vcz1, vcz2)

with pytest.raises(
ValueError,
match="Stores being appended must have same values for field 'variant_allele'",
):
append(vcz1, vcz2)
root1 = zarr.open(vcz1)
assert_array_equal(root1["sample_id"][:], ["S1", "S2"])
assert_array_equal(
root1["call_genotype"][:],
[
[[1, 0], [0, 0]],
[[1, 0], [0, 1]],
[[0, 1], [-1, -1]],
[[1, 1], [0, 0]],
],
)


def test_append_fails_for_misaligned_call_chunks():
Expand Down
4 changes: 1 addition & 3 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,4 @@ def test_append_cli_reports_real_validation_error(tmp_path):
)

assert result.exit_code == 1
assert "Error: Stores being appended must have same number of variants" in (
result.output
)
assert "Error: contig_id fields must be identical" in result.output
8 changes: 1 addition & 7 deletions tests/test_vcf_roundtrip.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@
# tests/data/vcf/sample-variants.vcf.gz

import pytest
import zarr

from vczstore.append import append
from vczstore.normalise import normalise
from vczstore.remove import remove
from vczstore.utils import transaction

from .utils import (
check_removed_sample,
Expand Down Expand Up @@ -204,15 +201,12 @@ def test_normalise_and_append(tmp_path, backend_storage, zarr_format):
backend_storage=backend_storage,
)
vcz1 = convert_vcf_to_vcz("sample-part1.vcf.gz", tmp_path, zarr_format=zarr_format)
vcz1_norm = zarr.storage.MemoryStore()

backend_storage_option = (
"" if backend_storage is None else f"--backend-storage {backend_storage}"
)

with transaction(vcz0, backend_storage=backend_storage) as store:
normalise(store, vcz1, vcz1_norm)
append(store, vcz1_norm)
append(vcz0, vcz1, backend_storage=backend_storage)

# check equivalence with original VCF
compare_vcf_and_vcz(
Expand Down
67 changes: 47 additions & 20 deletions vczstore/append.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import os
import pathlib
import tempfile
from contextlib import suppress
from itertools import product

Expand All @@ -9,11 +11,27 @@
from vcztools.utils import array_dims, open_zarr
from zarr.core.sync import sync

from vczstore.normalise import normalise
from vczstore.utils import compute_min_variants_chunk_size, copy_store, transaction

logger = logging.getLogger(__name__)


def _require_normalise(root1, root2):
n_variants1 = root1["variant_contig"].shape[0]
n_variants2 = root2["variant_contig"].shape[0]
if n_variants1 != n_variants2:
return True
fields = ["contig_id", "variant_contig", "variant_position"]
if "normalise_new_alleles" not in root2["variant_allele"].attrs:
# normalise has not been called
fields.append("variant_allele")
for field in fields:
if not np.array_equal(root1[field][:], root2[field][:]):
return True
return False


def _assert_append_arrays_compatible(name, arr1, arr2):
dims1 = array_dims(arr1)
dims2 = array_dims(arr2)
Expand Down Expand Up @@ -92,13 +110,22 @@ async def copy_chunk(src_coords):
)


def temp_norm_path(prefix=None):
with tempfile.TemporaryDirectory(prefix=prefix) as tmp:
return pathlib.Path(tmp) / "vcz_norm"


def append(
vcz1,
vcz2,
*,
haploid_contigs=None,
allow_new_alleles=False,
variant_chunks_in_batch=None,
show_progress=False,
backend_storage=None,
io_concurrency=None,
require_direct_copy=False,
backend_storage=None,
):
"""Append vcz2 to vcz1 in place"""
if io_concurrency is None:
Expand All @@ -110,33 +137,28 @@ def append(
root1 = open_zarr(vcz1, mode="r+", backend_storage=backend_storage)
root2 = zarr.open(vcz2, mode="r") # assume local

# normalise will set the 'normalise_new_alleles' flag if there are new alleles
normalise_new_alleles = root2["variant_allele"].attrs.get(
"normalise_new_alleles", False
)

# check preconditions
sample_id1 = root1["sample_id"]
sample_id2 = root2["sample_id"]
common_samples = np.intersect1d(sample_id1, sample_id2)
if common_samples.shape[0] > 0:
raise ValueError(f"Duplicate samples found: {common_samples}")

n_variants1 = root1["variant_contig"].shape[0]
n_variants2 = root2["variant_contig"].shape[0]
if n_variants1 != n_variants2:
raise ValueError(
"Stores being appended must have same number of variants. "
f"First has {n_variants1}, second has {n_variants2}"
if _require_normalise(root1, root2):
# TODO: use a context manager to delete temp path after use
vcz2_norm = temp_norm_path(prefix="vczstore")
normalise(
vcz1,
vcz2,
vcz2_norm,
haploid_contigs=haploid_contigs,
allow_new_alleles=allow_new_alleles,
variant_chunks_in_batch=variant_chunks_in_batch,
show_progress=show_progress,
backend_storage=backend_storage,
)
fields = ["contig_id", "variant_contig", "variant_position"]
if not normalise_new_alleles:
fields.append("variant_allele")
for field in fields:
if not np.array_equal(root1[field][:], root2[field][:]):
raise ValueError(
f"Stores being appended must have same values for field '{field}'"
)
vcz2 = vcz2_norm
root2 = zarr.open(vcz2, mode="r") # assume local

min_chunk_size1 = compute_min_variants_chunk_size(root1)
min_chunk_size2 = compute_min_variants_chunk_size(root2)
Expand Down Expand Up @@ -223,6 +245,11 @@ def append(
arr2[:, direct_count:incoming_num_samples, ...]
)

# normalise will set the 'normalise_new_alleles' flag if there are new alleles
normalise_new_alleles = root2["variant_allele"].attrs.get(
"normalise_new_alleles", False
)

if normalise_new_alleles:
logger.info("Overwriting variant_allele array since new alleles present")
copy_store(vcz2, vcz1, array_keys=["variant_allele"])
38 changes: 28 additions & 10 deletions vczstore/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ def call_or_error(function, *args, **kwargs):
help="Zarr backend storage to use; one of 'obstore' or 'icechunk'.",
)

allow_new_alleles = click.option(
"--allow-new-alleles",
is_flag=True,
help=(
"If new alleles are found at a variant site in vcz2 the variant_allele array "
"is updated, otherwise the operation fails if not specified."
),
)

variant_chunks_in_batch = click.option(
"--variant-chunks-in-batch",
type=click.IntRange(min=1),
Expand All @@ -71,7 +80,10 @@ def call_or_error(function, *args, **kwargs):
@click.command()
@click.argument("vcz1", type=click.Path())
@click.argument("vcz2", type=click.Path())
@allow_new_alleles
@variant_chunks_in_batch
@verbose
@progress
@backend_storage
@io_concurrency
@click.option(
Expand All @@ -82,16 +94,29 @@ def call_or_error(function, *args, **kwargs):
"This requires a sample chunk-aligned destination and incoming sample count."
),
)
def append(vcz1, vcz2, verbose, backend_storage, io_concurrency, require_direct_copy):
def append(
vcz1,
vcz2,
allow_new_alleles,
variant_chunks_in_batch,
verbose,
progress,
backend_storage,
io_concurrency,
require_direct_copy,
):
"""Append vcz2 to vcz1 in place"""
setup_logging(verbose)
call_or_error(
append_function,
vcz1,
vcz2,
allow_new_alleles=allow_new_alleles,
variant_chunks_in_batch=variant_chunks_in_batch,
show_progress=progress,
backend_storage=backend_storage,
io_concurrency=io_concurrency,
require_direct_copy=require_direct_copy,
backend_storage=backend_storage,
)


Expand Down Expand Up @@ -124,14 +149,7 @@ def create(vcz_out, vczs, samples_chunk_size, verbose, progress, backend_storage
@click.argument("vcz1", type=click.Path())
@click.argument("vcz2", type=click.Path())
@click.argument("vcz2_norm", type=click.Path())
@click.option(
"--allow-new-alleles",
is_flag=True,
help=(
"If new alleles are found at a variant site in vcz2 the variant_allele array "
"is updated, otherwise the operation fails if not specified."
),
)
@allow_new_alleles
@variant_chunks_in_batch
@verbose
@progress
Expand Down
Loading