Direct Zarr IO#355
Draft
jeromekelleher wants to merge 3 commits into
Draft
Conversation
Owns the path from (zarr.Array, chunk_coords) to np.ndarray on top of Zarr's storage and codec layers, without going through arr.blocks[idx] (which spins up a fresh asyncio loop per call). The output matches arr.blocks[coords] byte-for-byte across both Zarr v2 and v3 metadata, including boundary trim and missing-chunk fill values. Sharded arrays are explicitly unsupported and refused at construction. Verified by parity tests against synthetic fixtures (every codec) and real bio2zarr-produced VCZ fixtures (sample.vcz, sample.vcz3, field_type_combos). Nothing in the production code path uses BlockReader yet — the integration lands in the next PR.
VczReader now owns a single anyio BlockingPortal (started lazily on first variant_chunks() call) and a CapacityLimiter sized to DEFAULT_DECODE_THREADS (default os.cpu_count()). The 32-worker ThreadPoolExecutor still schedules block reads cross-chunk; each worker now blocks on portal.call(_read_block_async, ...) instead of arr.blocks[idx], so fetch and decode flow through the new BlockReader path that PR 1 introduced. ReadaheadPipeline takes the portal, decode_limiter, and a get_block_reader callable. BlockReadTemplate carries a BlockReader instead of a raw zarr.Array. _read_block_async handles slab fetches (slice(None) over non-variants axes) by resolving slices via BlockReader.cdata_shape, fetching every chunk concurrently inside an anyio.create_task_group, and assembling with np.block. VczReader gains a thread-safe _ensure_portal() (so concurrent variant_chunks() callers don't race on portal startup), a per-field _get_block_reader() cache, and an explicit close() that tears down both the executor and the portal. The full test suite passes unchanged. Performance benchmarks against the four backends are deferred to before merging — the dataset isn't pre-generated and a full sweep belongs in a separate review step.
The ThreadPoolExecutor-based ReadaheadPipeline and the _PrefetchIterator wrapper around _variant_chunks_gen are gone. In their place: - _produce_variant_chunks: async producer running on the reader's BlockingPortal. An outer anyio task group manages variant-chunk fetches; each fetch task uses an inner task group to fan out the field reads concurrently. The byte-budget refill semantics (bootstrap chunk runs solo, subsequent chunks scheduled until the in-flight count exceeds readahead_bytes / per-chunk-bytes) are preserved. After fetching, the producer applies the variant filter, materialises the output dict, and sends it through a 1-buffer MemoryObjectStream. Telemetry — max_in_flight, last_chunk_bytes, the final iteration log line — is reported via a shared dict. - _AsyncBackedIterator: sync iterator wrapping the channel via portal.call. close() cancels the producer task and shuts the channel; __del__ closes defensively. BaseExceptionGroup is unwrapped to a single exception so handle_exception in cli.py still surfaces the original ValueError. - weakref.finalize arms close on garbage collection. Without it the portal's daemon thread joins on the asyncio default executor's non-daemon decode workers and wedges process exit when the user forgets to use the reader as a context manager. CLI: --readahead-workers is removed; --io-concurrency caps concurrent store.get calls (default 32) and --decode-threads sizes the decode pool (default os.cpu_count()), separating IO from CPU concurrency. Tests: TestReadaheadPipeline, TestPrefetchIteratorDirect, and the _DepthTrackingPipeline / _make_pipeline / _shared_test_portal helpers are deleted. A new TestVariantChunksIterator covers eager validation, empty-fields short-circuit, exception propagation, close cancellation, and max_in_flight semantics through the public variant_chunks() API. The static-field-not-in-pipeline check now monkeypatches _read_block_async. Performance benchmarks against the four backends are deferred to before merging.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This is an experiment in which we perform direct chunk IO using the Zarr store interface and decoding APIs. We run the async IOs in our own async loop and hand off to a locally managed decoder pool.
I did it to see what the result would look like. I think we would eventually have to do something like this to optimise queries of different shapes, and ultimately have a straightforward per-chunk pipeline of operations applied synchronously once the chunk has been decoded, in the dedicated thread.
In the short term the complexity isn't worth it, as the current code is mostly working well.
Opening a draft PR so I don't lose the branch.
I'm sure the current implementation is overly complex and could be simplified considerably.