From 531283ad485e8f37f942bf9a13f911be63be2db3 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Fri, 29 May 2026 00:16:40 -0700 Subject: [PATCH 1/4] feat(sedona-raster-zarr): async-native cloud storage backends via object_store MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ZarrChunkReader now accepts file://, bare paths, s3://, gs:// / gcs://, az:// / abfs:// / abfss://, and http:// / https://. The reader's public API is pure-storage-in: hand it an Arc and a group URI (for chunk-anchor URI formatting) and it walks the chunk grid. Where the storage came from — registry lookup, env-var-built builder, an in-memory test fixture — is the caller's concern, not the reader's. * `open_storage_from_uri(uri, store_override: Option>)` is the helper for callers that don't already hold a credentialed store. `Some(store)` uses it directly (with PrefixStore for s3/gs/az; http(s) is rooted at the URL by its builder). `None` falls back to per-scheme *Builder::from_env().with_url(uri).build() — the same env-var credential discovery that read_format's orchestrated path gets via ensure_object_store_registered_with_options. file:// and bare paths always go through zarrs_filesystem's FilesystemStore wrapped in SyncToAsyncStorageAdapter, so the local backend shows up on the same async storage surface. * ZarrChunkReader::try_new is async; the sync RecordBatchReader streaming surface is unchanged since next() is pure CPU. * No dedicated tokio runtime; async tasks run on whatever runtime the caller is on (DataFusion's executor for the future SQL UDTF, an ad-hoc current-thread runtime block_on'd in PyZarrChunkReader::new for the Python FFI, #[tokio::test] in tests). * PyZarrChunkReader::new builds storage via open_storage_from_uri (env-var credentials by default), then drives the async try_new through a local current-thread tokio runtime. No synthetic RuntimeEnv, no datafusion-execution dep on the FFI cdylib. * Replaces Group::child_arrays (which opens every child up front and errors hard on any per-array metadata failure) with two purpose- built paths driven by the caller's arrays filter: - arrays: Some([...]) opens each by name with Array::async_open. No listing — usable against backends that can't list (plain HTTPS without WebDAV, S3-via-HttpStore). - arrays: None lists direct children with storage.list_dir, then Array::async_open each. Per-array open failures are logged at warn! and skipped, so a single malformed sibling (e.g. an xarray-style fixed-length-Unicode coord variable with a null fill_value that zarrs 0.23 can't open) no longer poisons the rest of the group. * zarrs_object_store pinned to 0.5 in the workspace; 0.6 depends on object_store 0.13, semver-incompatible with DataFusion 52's object_store 0.12.x. * Cloud smoke tests pass strictly against the public anonymous ITS_LIVE v2 ice-velocity datacubes (s3://its-live-data/...). Same bucket via s3:// (AmazonS3Builder) and via the virtual-hosted HTTPS URL (HttpStore), with an explicit M11/M12 filter to avoid listing on the HTTPS path. --- Cargo.lock | 40 +++- Cargo.toml | 1 + python/sedonadb-zarr/Cargo.toml | 3 + python/sedonadb-zarr/src/lib.rs | 108 ++++++++- rust/sedona-raster-zarr/Cargo.toml | 13 +- rust/sedona-raster-zarr/src/lib.rs | 1 + rust/sedona-raster-zarr/src/loader.rs | 219 ++++++++++-------- rust/sedona-raster-zarr/src/source_uri.rs | 115 +++++---- rust/sedona-raster-zarr/tests/cloud.rs | 127 ++++++++++ .../tests/zarr_roundtrip.rs | 73 +++--- 10 files changed, 521 insertions(+), 179 deletions(-) create mode 100644 rust/sedona-raster-zarr/tests/cloud.rs diff --git a/Cargo.lock b/Cargo.lock index 7967387a5..1e27cb5da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -553,6 +553,17 @@ dependencies = [ "abi_stable", ] +[[package]] +name = "async-generic" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddf3728566eefa873833159754f5732fb0951d3649e6e5b891cc70d56dd41673" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "async-lock" version = "3.4.2" @@ -4182,10 +4193,13 @@ version = "0.12.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "957228ad12042ee839f93c8f257b62b4c0ab5eaae1d4fa60de53b27c9d7c5046" dependencies = [ + "async-lock", "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", "equivalent", + "event-listener", + "futures-util", "parking_lot", "portable-atomic", "smallvec", @@ -6148,9 +6162,12 @@ version = "0.4.0" dependencies = [ "arrow-array", "arrow-schema", + "async-trait", + "datafusion", "datafusion-common", "futures", "log", + "object_store", "sedona-common", "sedona-raster", "sedona-schema", @@ -6158,8 +6175,10 @@ dependencies = [ "serde_json", "tempfile", "tokio", + "url", "zarrs", "zarrs_filesystem", + "zarrs_object_store", ] [[package]] @@ -6397,8 +6416,11 @@ name = "sedonadb-zarr" version = "0.4.0" dependencies = [ "arrow-array", + "object_store", "pyo3", "sedona-raster-zarr", + "tokio", + "url", ] [[package]] @@ -7944,7 +7966,9 @@ version = "0.23.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "251f4ec1a9e60ca9c693ade9b29a0b981a61e68ea3e2ae85fa9da31bcdca5dbe" dependencies = [ + "async-generic", "async-lock", + "async-trait", "base64", "blosc-src", "blusc", @@ -7953,6 +7977,7 @@ dependencies = [ "crc32c", "derive_more", "flate2", + "futures", "getrandom 0.3.4", "half", "inventory", @@ -7979,7 +8004,6 @@ dependencies = [ "zarrs_chunk_key_encoding", "zarrs_codec", "zarrs_data_type", - "zarrs_filesystem", "zarrs_metadata", "zarrs_metadata_ext", "zarrs_plugin", @@ -8104,6 +8128,18 @@ dependencies = [ "zarrs_metadata", ] +[[package]] +name = "zarrs_object_store" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d9d0d3db426dd50dfb0b5d7cc1660bda368caeb5cd8645c60c46bc4f261a19" +dependencies = [ + "async-trait", + "futures", + "object_store", + "zarrs_storage", +] + [[package]] name = "zarrs_plugin" version = "0.4.1" @@ -8122,9 +8158,11 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d098796d2ed4cf94896569615101e0432e870a7665396da5cc32300fb68f7c1" dependencies = [ + "async-trait", "auto_impl", "bytes", "derive_more", + "futures", "itertools 0.14.0", "thiserror 2.0.17", "unsafe_cell_slice", diff --git a/Cargo.toml b/Cargo.toml index 3fd9b71b5..dce8c9596 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -137,6 +137,7 @@ wkb = "0.9.2" wkt = "0.14.0" zarrs = { version = "0.23", default-features = false } zarrs_filesystem = "0.3" +zarrs_object_store = "0.5" # Workspace path dependencies for internal crates sedona = { version = "0.4.0", path = "rust/sedona" } diff --git a/python/sedonadb-zarr/Cargo.toml b/python/sedonadb-zarr/Cargo.toml index 2a4a939c7..072e652ad 100644 --- a/python/sedonadb-zarr/Cargo.toml +++ b/python/sedonadb-zarr/Cargo.toml @@ -29,5 +29,8 @@ doc = false [dependencies] arrow-array = { workspace = true, features = ["ffi"] } +object_store = { workspace = true, features = ["aws", "gcp", "azure", "http"] } pyo3 = { version = "0.25.1" } sedona-raster-zarr = { workspace = true } +tokio = { workspace = true, features = ["rt"] } +url = { workspace = true } diff --git a/python/sedonadb-zarr/src/lib.rs b/python/sedonadb-zarr/src/lib.rs index 0f70c8fa0..8d2d9c135 100644 --- a/python/sedonadb-zarr/src/lib.rs +++ b/python/sedonadb-zarr/src/lib.rs @@ -20,13 +20,16 @@ //! Python-side `ZarrFormatSpec`. use std::ffi::CString; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use arrow_array::ffi_stream::FFI_ArrowArrayStream; +use object_store::ObjectStore; use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::prelude::*; use pyo3::types::PyCapsule; -use sedona_raster_zarr::ZarrChunkReader; +use sedona_raster_zarr::{open_storage_from_uri, ZarrChunkReader}; +use tokio::runtime::Builder; +use url::Url; /// Single-use `__arrow_c_stream__` wrapper around `ZarrChunkReader`. #[pyclass] @@ -39,7 +42,24 @@ impl PyZarrChunkReader { #[new] #[pyo3(signature = (uri, arrays=None, batch_size=8192))] fn new(uri: &str, arrays: Option>, batch_size: usize) -> PyResult { - let reader = ZarrChunkReader::try_new(uri, arrays.as_deref(), batch_size) + let store = + default_object_store_for_uri(uri).map_err(|e| PyValueError::new_err(e.to_string()))?; + let storage = + open_storage_from_uri(uri, store).map_err(|e| PyValueError::new_err(e.to_string()))?; + // The crate's async-native loader exposes `try_new` as an + // `async fn`; the Python FFI is a sync constructor by + // contract, so we bridge with an ad-hoc current-thread tokio + // runtime here. `next()` on the returned reader is pure CPU + // and stays synchronous. + let runtime = Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + let arrays_ref = arrays.as_deref(); + let reader = runtime + .block_on(ZarrChunkReader::try_new( + storage, uri, arrays_ref, batch_size, + )) .map_err(|e| PyValueError::new_err(e.to_string()))?; Ok(Self { inner: Mutex::new(Some(reader)), @@ -70,6 +90,88 @@ impl PyZarrChunkReader { } } +/// Build an `Arc` for `uri` using env-var credential +/// discovery, per scheme. This is the temporary bridge that lets the +/// Python FFI work without going through `con.read_format`; it +/// reaches into `AWS_*` / `GOOGLE_*` / `AZURE_*` environment +/// variables via the `object_store` per-backend `Builder::from_env` +/// helpers. +/// +/// **Slated for removal.** When the host wheel surfaces +/// `args.src.store` to `open_reader` via the `FFI_ObjectStore` +/// capsule machinery (in progress — see +/// ), this function (and +/// the `object_store` `aws`/`gcp`/`azure`/`http` features that back it) +/// gets deleted — the store will arrive credentialed from the host's +/// `ObjectStoreRegistry` and `PyZarrChunkReader::new` will extract it +/// from the capsule instead. For `file://` and bare paths the +/// returned store is a no-op placeholder; the loader uses +/// `FilesystemStore` directly for the local case. +fn default_object_store_for_uri(uri: &str) -> Result, PyErr> { + // file:// and bare paths use a LocalFileSystem rooted at `/`; + // open_storage_from_uri prefixes it at the group's path. This + // matches the store the host's ObjectStoreRegistry yields for + // file://, so the loader treats local and cloud uniformly. + if uri.starts_with("file://") || !uri.contains("://") { + return Ok(Arc::new(object_store::local::LocalFileSystem::new())); + } + let url = Url::parse(uri) + .map_err(|e| PyValueError::new_err(format!("group URI {uri:?} is not a valid URL: {e}")))?; + match url.scheme().to_ascii_lowercase().as_str() { + "s3" => { + use object_store::aws::AmazonS3Builder; + let store = AmazonS3Builder::from_env() + .with_url(uri) + .build() + .map_err(|e| PyValueError::new_err(build_err("s3", uri, e)))?; + Ok(Arc::new(store)) + } + "gs" | "gcs" => { + use object_store::gcp::GoogleCloudStorageBuilder; + let store = GoogleCloudStorageBuilder::from_env() + .with_url(uri) + .build() + .map_err(|e| PyValueError::new_err(build_err("gcs", uri, e)))?; + Ok(Arc::new(store)) + } + "az" | "abfs" | "abfss" => { + use object_store::azure::MicrosoftAzureBuilder; + let store = MicrosoftAzureBuilder::from_env() + .with_url(uri) + .build() + .map_err(|e| PyValueError::new_err(build_err("azure", uri, e)))?; + Ok(Arc::new(store)) + } + "http" | "https" => { + use object_store::http::HttpBuilder; + // open_storage_from_uri applies the path as a PrefixStore, + // so the HttpStore must be rooted at scheme+authority only + // — unlike S3/GCS/Azure, HttpBuilder roots at whatever URL + // it's given, so hand it the authority without the path. + let authority = format!("{}://{}", url.scheme(), url.authority()); + let store = HttpBuilder::new() + .with_url(authority) + .build() + .map_err(|e| PyValueError::new_err(build_err("http", uri, e)))?; + Ok(Arc::new(store)) + } + other => Err(PyValueError::new_err(format!( + "unsupported Zarr URI scheme {other:?}; expected one of: \ + file, s3, gs, gcs, az, abfs, abfss, http, https" + ))), + } +} + +fn build_err(backend: &str, uri: &str, err: object_store::Error) -> String { + format!( + "failed to build {backend} object_store for {uri}: {err}. \ + Provide credentials via standard environment variables \ + (AWS_ACCESS_KEY_ID/AWS_REGION for s3, GOOGLE_SERVICE_ACCOUNT_KEY \ + for gcs, AZURE_STORAGE_ACCOUNT_NAME/AZURE_STORAGE_ACCOUNT_KEY \ + for azure)." + ) +} + #[pymodule] fn _lib(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; diff --git a/rust/sedona-raster-zarr/Cargo.toml b/rust/sedona-raster-zarr/Cargo.toml index 93e946812..b21de1d70 100644 --- a/rust/sedona-raster-zarr/Cargo.toml +++ b/rust/sedona-raster-zarr/Cargo.toml @@ -33,17 +33,26 @@ result_large_err = "allow" [dependencies] arrow-array = { workspace = true } arrow-schema = { workspace = true } +async-trait = { workspace = true } +datafusion = { workspace = true, default_features = false } datafusion-common = { workspace = true } futures = { workspace = true } log = { workspace = true } +object_store = { workspace = true } sedona-common = { workspace = true } sedona-raster = { workspace = true } sedona-schema = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -zarrs = { workspace = true, features = ["filesystem", "gzip", "zstd", "blosc", "crc32c", "sharding", "transpose"] } -zarrs_filesystem = { workspace = true } +tokio = { workspace = true } +url = { workspace = true } +zarrs = { workspace = true, features = ["async", "gzip", "zstd", "blosc", "crc32c", "sharding", "transpose"] } +zarrs_object_store = { workspace = true } [dev-dependencies] +object_store = { workspace = true, features = ["aws", "gcp", "azure", "http"] } tempfile = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +# Test fixtures write Zarr groups to a temp dir; the loader itself +# reads everything through object_store, so this is dev-only. +zarrs_filesystem = { workspace = true } diff --git a/rust/sedona-raster-zarr/src/lib.rs b/rust/sedona-raster-zarr/src/lib.rs index 1d35d29af..523ccc763 100644 --- a/rust/sedona-raster-zarr/src/lib.rs +++ b/rust/sedona-raster-zarr/src/lib.rs @@ -33,3 +33,4 @@ mod loader; mod source_uri; pub use loader::ZarrChunkReader; +pub use source_uri::open_storage_from_uri; diff --git a/rust/sedona-raster-zarr/src/loader.rs b/rust/sedona-raster-zarr/src/loader.rs index 631565df2..b6681ac7f 100644 --- a/rust/sedona-raster-zarr/src/loader.rs +++ b/rust/sedona-raster-zarr/src/loader.rs @@ -36,11 +36,13 @@ use zarrs::array::Array; #[cfg(test)] use zarrs::array::ArrayBytes; use zarrs::group::Group; -use zarrs_filesystem::FilesystemStore; +use zarrs::storage::{ + AsyncReadableListableStorage, AsyncReadableListableStorageTraits, StorePrefix, +}; use crate::dtype::zarr_to_band_data_type; use crate::geozarr::GroupGeoMetadata; -use crate::source_uri::{build_chunk_anchor, group_uri_to_filesystem_path}; +use crate::source_uri::build_chunk_anchor; /// Streaming reader over the chunk grid of a Zarr group. /// @@ -102,7 +104,8 @@ impl ZarrChunkReader { /// `batch_size` controls how many chunk rows are emitted per /// `RecordBatch`. Must be ≥ 1; callers typically pass /// `SessionConfig::batch_size` (defaults to 8192). - pub fn try_new( + pub async fn try_new( + storage: AsyncReadableListableStorage, group_uri: &str, arrays: Option<&[String]>, batch_size: usize, @@ -113,7 +116,7 @@ impl ZarrChunkReader { geo, group_transform, spatial_dim_indices, - } = open_and_validate(group_uri, arrays)?; + } = open_and_validate(storage, group_uri, arrays).await?; let spatial_dims_names: Vec = spatial_dim_indices .iter() @@ -267,20 +270,12 @@ struct OpenedGroup { /// Open the Zarr group, parse and validate group metadata, and return /// everything `ZarrChunkReader` needs to iterate without further I/O /// (apart from per-chunk byte fetches by future resolvers). -fn open_and_validate( +async fn open_and_validate( + storage: AsyncReadableListableStorage, group_uri: &str, arrays_filter: Option<&[String]>, ) -> Result { - let fs_path = group_uri_to_filesystem_path(group_uri)?; - let store = FilesystemStore::new(&fs_path).map_err(|e| { - ArrowError::ExternalError(Box::new(sedona_internal_datafusion_err!( - "failed to open Zarr filesystem store at {}: {e}", - fs_path.display() - ))) - })?; - let storage: Arc = Arc::new(store); - - let group = Group::open(storage.clone(), "/").map_err(|e| { + let group = Group::async_open(storage.clone(), "/").await.map_err(|e| { ArrowError::ExternalError(Box::new(sedona_internal_datafusion_err!( "failed to open Zarr group at {group_uri}: {e}" ))) @@ -302,18 +297,55 @@ fn open_and_validate( ))); } - let arrays = group.child_arrays().map_err(|e| { - ArrowError::ExternalError(Box::new(sedona_internal_datafusion_err!( - "failed to enumerate child arrays in {group_uri}: {e}" - ))) - })?; - if arrays.is_empty() { - return Err(ArrowError::InvalidArgumentError(format!( - "Zarr group at {group_uri} has no child arrays" - ))); - } + let arrays = match arrays_filter { + // Explicit filter: open each named array directly, skipping the + // listing step. This is the only viable path against backends + // that don't expose directory listing (plain HTTPS without + // WebDAV, S3-via-HttpStore) and is strictly less I/O than + // list-then-filter when the user already knows what they want. + // A 1-D array named explicitly is a user error — surface it + // immediately rather than letting the spatial-dim resolver + // fail with a confusing message downstream. + Some(names) => { + let arrays = open_named_arrays(&storage, names, group_uri).await?; + for (name, array) in names.iter().zip(arrays.iter()) { + if array.shape().len() < 2 { + return Err(ArrowError::InvalidArgumentError(format!( + "array {name:?} has rank {} (shape {:?}); a raster band \ + requires at least 2 dimensions and cannot be read.", + array.shape().len(), + array.shape() + ))); + } + } + arrays + } + // Discovery: list direct children of the group and try to open + // each as an array. Per-array open failures are logged + skipped + // rather than poisoning the whole group, so a single malformed + // sibling (e.g. ITS_LIVE's U-typed coord variables with null + // fill values) doesn't take the whole read offline. 1-D arrays + // (typical xarray coord variables) are silently dropped so a + // canonical xarray layout reads cleanly. + None => { + let arrays = enumerate_child_arrays(&storage, group_uri).await?; + if arrays.is_empty() { + return Err(ArrowError::InvalidArgumentError(format!( + "Zarr group at {group_uri} has no child arrays" + ))); + } + let kept: Vec<_> = arrays.into_iter().filter(|a| a.shape().len() > 1).collect(); + if kept.is_empty() { + return Err(ArrowError::InvalidArgumentError(format!( + "Zarr group at {group_uri} contains only 1-D arrays (typical \ + xarray-style coord variables); a raster band requires at least \ + 2 dimensions, so this group has nothing readable as a raster" + ))); + } + kept + } + }; - let arrays = select_arrays(arrays, arrays_filter, group_uri)?; let array_infos = collect_array_infos(arrays)?; validate_group_constraints(&array_infos)?; @@ -367,6 +399,67 @@ fn open_and_validate( }) } +/// Open arrays at the explicit paths requested by the caller, skipping +/// the group listing step entirely. Used when `arrays_filter` is `Some` +/// — the canonical path for backends that can't list (plain HTTPS / S3 +/// behind HttpStore) and a strict subset of work for any other backend. +async fn open_named_arrays( + storage: &AsyncReadableListableStorage, + names: &[String], + group_uri: &str, +) -> Result>, ArrowError> { + let mut out = Vec::with_capacity(names.len()); + for name in names { + let normalized = name.trim_start_matches('/'); + let path = format!("/{normalized}"); + let array = Array::async_open(storage.clone(), &path) + .await + .map_err(|e| { + ArrowError::InvalidArgumentError(format!( + "Zarr group at {group_uri} has no array named {name:?} \ + (or its metadata could not be parsed): {e}" + )) + })?; + out.push(array); + } + Ok(out) +} + +/// List the direct children of the group and try to open each as an +/// array. Per-array open failures are logged at warn level and the +/// child is skipped — a single malformed sibling array (e.g. an +/// xarray-style coord variable with a dtype zarrs can't yet parse) +/// can no longer poison the whole group. Subgroups are filtered out by +/// the same `Array::async_open`-error skip, so the externally +/// observable behaviour matches the old `Group::child_arrays()` +/// contract for well-formed groups while being permissive about +/// partial breakage. +async fn enumerate_child_arrays( + storage: &AsyncReadableListableStorage, + group_uri: &str, +) -> Result>, ArrowError> { + let listing = storage.list_dir(&StorePrefix::root()).await.map_err(|e| { + ArrowError::ExternalError(Box::new(sedona_internal_datafusion_err!( + "failed to list child nodes of Zarr group at {group_uri}: {e}" + ))) + })?; + let mut arrays = Vec::with_capacity(listing.prefixes().len()); + for child in listing.prefixes() { + let raw = child.as_str().trim_end_matches('/'); + if raw.is_empty() { + continue; + } + let path = format!("/{raw}"); + match Array::async_open(storage.clone(), &path).await { + Ok(array) => arrays.push(array), + Err(e) => log::warn!( + "skipping Zarr child {path} in {group_uri}: not openable as an array ({e})" + ), + } + } + Ok(arrays) +} + /// Collect per-array metadata from open zarrs `Array` handles. /// /// Sorts arrays by path so band ordering across rows is deterministic @@ -374,7 +467,7 @@ fn open_and_validate( /// filesystem stores currently happen to enumerate alphabetically, but /// that's not part of the contract we want consumers to rely on). fn collect_array_infos( - mut arrays: Vec>, + mut arrays: Vec>, ) -> Result, ArrowError> { arrays.sort_by(|a, b| a.path().as_str().cmp(b.path().as_str())); let mut out = Vec::with_capacity(arrays.len()); @@ -411,69 +504,6 @@ fn collect_array_infos( Ok(out) } -/// Apply the array-selection rules. 1-D arrays (typical xarray-style -/// coord variables) are always dropped — a raster band requires at -/// least 2 dimensions, so reading a 1-D array could never succeed. -/// - Explicit filter: keep arrays whose path (with leading `/` stripped) -/// matches one of the requested names. Unknown names error so users -/// don't silently get an empty group from a typo; naming a 1-D array -/// errors with a clear message rather than producing a confusing -/// "no spatial axes" failure downstream. -/// - No filter: read every multi-dimensional array. -fn select_arrays( - arrays: Vec>, - filter: Option<&[String]>, - group_uri: &str, -) -> Result>, ArrowError> { - if let Some(names) = filter { - let available: Vec = arrays - .iter() - .map(|a| a.path().as_str().trim_start_matches('/').to_string()) - .collect(); - for requested in names { - let needle = requested.trim_start_matches('/'); - match arrays - .iter() - .find(|a| a.path().as_str().trim_start_matches('/') == needle) - { - None => { - return Err(ArrowError::InvalidArgumentError(format!( - "Zarr group at {group_uri} has no array named {requested:?}; \ - available arrays: {available:?}" - ))); - } - Some(a) if a.shape().len() < 2 => { - return Err(ArrowError::InvalidArgumentError(format!( - "array {requested:?} has rank {} (shape {:?}); a raster band \ - requires at least 2 dimensions and cannot be read.", - a.shape().len(), - a.shape() - ))); - } - Some(_) => {} - } - } - let kept: Vec<_> = arrays - .into_iter() - .filter(|a| { - let path = a.path().as_str().trim_start_matches('/').to_string(); - names.iter().any(|n| n.trim_start_matches('/') == path) - }) - .collect(); - return Ok(kept); - } - - let kept: Vec<_> = arrays.into_iter().filter(|a| a.shape().len() > 1).collect(); - if kept.is_empty() { - return Err(ArrowError::InvalidArgumentError(format!( - "Zarr group at {group_uri} contains only 1-D arrays (typical \ - xarray-style coord variables); a raster band requires at least \ - 2 dimensions, so this group has nothing readable as a raster" - ))); - } - Ok(kept) -} - /// Resolve dimension names for an array, supporting both Zarr v3 /// (first-class `dimension_names` field) and Zarr v2 with the xarray /// `_ARRAY_DIMENSIONS` attribute. Errors if neither carries a complete @@ -660,10 +690,10 @@ fn advance_chunk_indices(chunk_indices: &mut [u64], chunk_grid_shape: &[u64]) -> /// exercises it so the implementation doesn't bit-rot in the /// meantime. #[cfg(test)] -fn retrieve_chunk_bytes( - array: &Array, - chunk_indices: &[u64], -) -> Result, ArrowError> { +fn retrieve_chunk_bytes(array: &Array, chunk_indices: &[u64]) -> Result, ArrowError> +where + S: ?Sized + zarrs::storage::ReadableStorageTraits + 'static, +{ let bytes = array .retrieve_chunk::>(chunk_indices) .map_err(|e| { @@ -689,6 +719,7 @@ mod tests { use tempfile::TempDir; use zarrs::array::data_type; use zarrs::array::ArrayBuilder; + use zarrs_filesystem::FilesystemStore; /// Direct coverage for `retrieve_chunk_bytes`. The function is the /// only pixel-byte read primitive in the crate today; previously it diff --git a/rust/sedona-raster-zarr/src/source_uri.rs b/rust/sedona-raster-zarr/src/source_uri.rs index 6bd984854..024b4e2ba 100644 --- a/rust/sedona-raster-zarr/src/source_uri.rs +++ b/rust/sedona-raster-zarr/src/source_uri.rs @@ -15,30 +15,36 @@ // specific language governing permissions and limitations // under the License. -//! Zarr URI helpers — group locators and per-chunk OutDb anchors. +//! Zarr URI helpers — store resolution and per-chunk OutDb anchors. //! //! Two URI shapes flow through the loader: //! //! 1. **Group URI** (the loader entry-point argument). Identifies a Zarr -//! group on a backend, e.g. `file:///tmp/datacube.zarr` or a bare -//! local path. Cloud schemes (`s3://`, `gs://`, `az://`, `https://`) -//! will be accepted by a future revision; today only `file://` and -//! bare-path are recognised. +//! group on a backend. Supported schemes: `file://`, bare local path, +//! `s3://`, `gs://`/`gcs://`, `az://`/`abfs://`/`abfss://`, +//! `http://`, `https://`. //! 2. **Chunk anchor URI** (written into a band's `outdb_uri`). Addresses //! one chunk in one array within a group: //! `#array=&chunk=,,...`. The store URI //! is the original group URI verbatim (whatever scheme that uses); //! array path and chunk indices both live in the fragment so the //! store URI is unambiguous even when both contain `/` (e.g. -//! `s3://bucket/foo.zarr/2024` + `subgroup/B01`). Inner-chunk indices -//! always — sharding is a storage detail the resolver handles. +//! `s3://bucket/foo.zarr/2024` + `subgroup/B01`). //! //! The "this is a zarr anchor" signal lives in the band's //! `outdb_format = "zarr"` field, not in a URI scheme prefix — matches //! the GDAL OutDb convention and lets the format-keyed dispatcher //! route without parsing URI schemes. +use std::sync::Arc; + use arrow_schema::ArrowError; +use object_store::path::Path as ObjectPath; +use object_store::prefix::PrefixStore; +use object_store::ObjectStore; +use url::Url; +use zarrs::storage::AsyncReadableListableStorage; +use zarrs_object_store::AsyncObjectStore; /// Parts of a chunk-anchor URI. /// @@ -48,14 +54,8 @@ use arrow_schema::ArrowError; #[cfg(test)] #[derive(Debug, Clone, PartialEq, Eq)] pub struct ChunkAnchor { - /// Original store URI for the *group* (e.g. `file:///tmp/foo.zarr`, - /// `s3://bucket/foo.zarr/2024`). Same value across every chunk of every - /// array in a group. pub store_uri: String, - /// Array's path within the store (e.g. `temperature`, `subgroup/B01`). pub array_path: String, - /// Chunk's position in the array's inner chunk grid (one index per - /// array dimension). pub chunk_indices: Vec, } @@ -138,24 +138,44 @@ pub fn parse_chunk_anchor(uri: &str) -> Result { }) } -/// Normalize a user-supplied group URI into a local filesystem path. +/// Resolve a group URI into an async zarrs storage handle rooted at +/// the group, given a credentialed [`object_store::ObjectStore`] for +/// the URI. /// -/// Only `file://` and bare-path URIs are supported. Cloud schemes -/// (`s3://`, `gs://`, `az://`, `https://`) error with a clear message. -pub fn group_uri_to_filesystem_path(uri: &str) -> Result { - if let Some(rest) = uri.strip_prefix("file://") { - return Ok(std::path::PathBuf::from(rest)); - } - for scheme in ["s3://", "gs://", "az://", "https://", "http://"] { - if uri.starts_with(scheme) { - return Err(ArrowError::NotYetImplemented(format!( - "cloud Zarr stores ({scheme}…) are not supported yet; \ - use a local filesystem path or `file://` URI" - ))); - } - } - // Bare path. - Ok(std::path::PathBuf::from(uri)) +/// **Contract:** the caller passes a `store` rooted at the URI's +/// scheme + authority — exactly what +/// [`datafusion_execution::object_store::ObjectStoreRegistry::get_store`] +/// returns (e.g. for `s3://bucket/path/foo.zarr` the store is rooted at +/// `s3://bucket`; for `file:///data/foo.zarr` it's a +/// `LocalFileSystem` rooted at `/`). This helper then wraps it in a +/// [`PrefixStore`] at the URL's path so the returned storage is rooted +/// at the zarr group. Bare paths are treated as `file://`. +/// +/// The scheme itself is opaque here: any backend whose `store` honours +/// the rooting contract works without a per-scheme branch. Schemes the +/// caller can't produce a store for never reach this function. +/// +/// Returns storage rooted at the group: callers invoke +/// `Group::async_open(storage, "/").await`. +pub fn open_storage_from_uri( + uri: &str, + store: Arc, +) -> Result { + // Bare local paths aren't valid URLs; coerce them to `file://` so + // the path lands in `url.path()` like every other scheme. + let url = Url::parse(uri) + .or_else(|_| Url::parse(&format!("file://{uri}"))) + .map_err(|e| { + ArrowError::InvalidArgumentError(format!("group URI {uri:?} is not a valid URL: {e}")) + })?; + + let prefix = url.path().trim_start_matches('/').to_string(); + let prefixed: Arc = if prefix.is_empty() { + store + } else { + Arc::new(PrefixStore::new(store, ObjectPath::from(prefix))) + }; + Ok(Arc::new(AsyncObjectStore::new(prefixed))) } #[cfg(test)] @@ -206,8 +226,6 @@ mod tests { #[test] fn parse_anchor_rejects_missing_fragment() { - // No `#` means no fragment, so neither `array=` nor `chunk=` is - // present. The parser surfaces "missing fragment". let err = parse_chunk_anchor("file:///foo.zarr") .unwrap_err() .to_string(); @@ -248,8 +266,6 @@ mod tests { #[test] fn parse_anchor_ignores_unknown_fragment_params() { - // Forward-compatible: extra `&key=value` pairs in the fragment - // shouldn't break parsing — they're reserved for future use. let anchor = parse_chunk_anchor("file:///foo.zarr#array=t&chunk=0,0&version=v3").unwrap(); assert_eq!(anchor.array_path, "t"); assert_eq!(anchor.chunk_indices, vec![0, 0]); @@ -271,24 +287,31 @@ mod tests { assert_eq!(parsed, original); } + fn placeholder_store() -> Arc { + Arc::new(object_store::memory::InMemory::new()) + } + #[test] - fn group_uri_file_scheme() { - let path = group_uri_to_filesystem_path("file:///tmp/foo.zarr").unwrap(); - assert_eq!(path, std::path::PathBuf::from("/tmp/foo.zarr")); + fn open_storage_wraps_cloud_store_with_prefix() { + // An InMemory store stands in for a bucket-rooted cloud store; + // the s3:// path component should be applied as a PrefixStore. + let storage = open_storage_from_uri("s3://bucket/path/foo.zarr", placeholder_store()) + .expect("cloud URI builds storage from a passed-in store"); + assert!(Arc::strong_count(&storage) >= 1); } #[test] - fn group_uri_bare_path() { - let path = group_uri_to_filesystem_path("/tmp/foo.zarr").unwrap(); - assert_eq!(path, std::path::PathBuf::from("/tmp/foo.zarr")); + fn open_storage_accepts_file_scheme() { + let storage = open_storage_from_uri("file:///nonexistent/foo.zarr", placeholder_store()) + .expect("file:// URI builds storage"); + assert!(Arc::strong_count(&storage) >= 1); } #[test] - fn group_uri_cloud_scheme_errors() { - let err = group_uri_to_filesystem_path("s3://bucket/foo.zarr") - .unwrap_err() - .to_string(); - assert!(err.contains("s3://"), "{err}"); - assert!(err.contains("not supported"), "{err}"); + fn open_storage_accepts_bare_path() { + // Bare paths are coerced to file:// rather than rejected. + let storage = open_storage_from_uri("/nonexistent/foo.zarr", placeholder_store()) + .expect("bare path builds storage"); + assert!(Arc::strong_count(&storage) >= 1); } } diff --git a/rust/sedona-raster-zarr/tests/cloud.rs b/rust/sedona-raster-zarr/tests/cloud.rs new file mode 100644 index 000000000..2e75af231 --- /dev/null +++ b/rust/sedona-raster-zarr/tests/cloud.rs @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Cloud-backed smoke tests for `ZarrChunkReader`. +//! +//! These tests reach the network and are `#[ignore]` by default. Both +//! target the public, anonymous ITS_LIVE v2 ice-velocity datacubes +//! (NASA MEaSUREs), hosted at `s3://its-live-data/` in `us-west-2`. The +//! dataset is a Zarr v2 group whose data arrays declare +//! `dimension_names = ['mid_date', 'y', 'x']` — passing the loader's +//! default spatial-dim policy — and whose coordinate variables include +//! a few short fixed-length Unicode (`/` ↔ +/// `https://.s3.us-west-2.amazonaws.com/`. +const ITS_LIVE_BUCKET: &str = "its-live-data"; +const ITS_LIVE_KEY: &str = + "datacubes/v2/N40W120/ITS_LIVE_vel_EPSG32610_G0120_X250000_Y5450000.zarr"; + +/// Arrays known to share the canonical (mid_date, y, x) layout and +/// chunk grid in the ITS_LIVE v2 datacubes. The same group also holds +/// arrays with incompatible chunk grids (e.g. `floatingice`) and +/// U-typed coord variables zarrs can't currently open, so both smokes +/// pin the read to this whitelist instead of relying on discovery. +const ITS_LIVE_ARRAYS: &[&str] = &["M11", "M12"]; + +fn count_rows(reader: ZarrChunkReader) -> usize { + let mut rows = 0; + for batch in reader { + let batch = batch.expect("batch read ok"); + let s = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("raster column is a StructArray"); + rows += RasterStructArray::new(s).len(); + } + rows +} + +fn its_live_arrays() -> Vec { + ITS_LIVE_ARRAYS.iter().map(|s| (*s).into()).collect() +} + +/// `s3://its-live-data/...` via [`object_store::aws::AmazonS3Builder`]. +/// +/// Requires AWS credentials in env (`AWS_REGION`, `AWS_ACCESS_KEY_ID`, +/// `AWS_SECRET_ACCESS_KEY`) — or for anonymous public reads, +/// `AWS_SKIP_SIGNATURE=true` plus `AWS_REGION=us-west-2`. +#[tokio::test] +#[ignore] +async fn s3_zarr_smoke() { + let uri = format!("s3://{ITS_LIVE_BUCKET}/{ITS_LIVE_KEY}"); + let arrays = its_live_arrays(); + let store: Arc = Arc::new( + AmazonS3Builder::from_env() + .with_url(&uri) + .build() + .expect("build AmazonS3 store from env"), + ); + let storage = open_storage_from_uri(&uri, store).expect("open_storage_from_uri"); + let reader = ZarrChunkReader::try_new(storage, &uri, Some(&arrays), 1024) + .await + .expect("ZarrChunkReader::try_new against ITS_LIVE on s3://"); + let rows = count_rows(reader); + assert!(rows > 0, "expected at least one chunk row from {uri}"); +} + +/// `https://...s3.us-west-2.amazonaws.com/...` via +/// [`object_store::http::HttpStore`]. Same bucket, different URI scheme +/// and different storage backend — exercises the HTTPS code path +/// without depending on `PROPFIND`-style listing, which AWS S3 doesn't +/// support. +#[tokio::test] +#[ignore] +async fn https_zarr_smoke() { + let uri = format!("https://{ITS_LIVE_BUCKET}.s3.us-west-2.amazonaws.com/{ITS_LIVE_KEY}"); + let arrays = its_live_arrays(); + // open_storage_from_uri expects a store rooted at scheme+authority + // and applies the path as a PrefixStore itself, so build the + // HttpStore against the bucket host only — not the full key. + let authority = format!("https://{ITS_LIVE_BUCKET}.s3.us-west-2.amazonaws.com"); + let store: Arc = Arc::new( + HttpBuilder::new() + .with_url(authority) + .build() + .expect("build HttpStore"), + ); + let storage = open_storage_from_uri(&uri, store).expect("open_storage_from_uri"); + let reader = ZarrChunkReader::try_new(storage, &uri, Some(&arrays), 1024) + .await + .expect("ZarrChunkReader::try_new against ITS_LIVE on https://"); + let rows = count_rows(reader); + assert!(rows > 0, "expected at least one chunk row from {uri}"); +} diff --git a/rust/sedona-raster-zarr/tests/zarr_roundtrip.rs b/rust/sedona-raster-zarr/tests/zarr_roundtrip.rs index 7da73b753..0f3800e61 100644 --- a/rust/sedona-raster-zarr/tests/zarr_roundtrip.rs +++ b/rust/sedona-raster-zarr/tests/zarr_roundtrip.rs @@ -30,13 +30,20 @@ use arrow_array::StructArray; use arrow_schema::ArrowError; use sedona_raster::array::RasterStructArray; use sedona_raster::traits::RasterRef; -use sedona_raster_zarr::ZarrChunkReader; +use sedona_raster_zarr::{open_storage_from_uri, ZarrChunkReader}; /// Drain a `ZarrChunkReader` into a single `StructArray`. Fixtures in /// this file are small (≤8 chunk rows) so they fit in one batch with a /// generous batch_size. Anything bigger would need `arrow::compute::concat`. -fn read_all(uri: &str, arrays: Option<&[String]>) -> Result { - let reader = ZarrChunkReader::try_new(uri, arrays, 1024)?; +async fn read_all(uri: &str, arrays: Option<&[String]>) -> Result { + // Fixtures are file:// URIs rooted at an absolute temp path. Pass a + // LocalFileSystem rooted at `/` — the same shape the host's + // ObjectStoreRegistry yields for file:// — and let + // open_storage_from_uri prefix it at the group's path. + let store: Arc = + Arc::new(object_store::local::LocalFileSystem::new()); + let storage = open_storage_from_uri(uri, store)?; + let reader = ZarrChunkReader::try_new(storage, uri, arrays, 1024).await?; let batches: Vec<_> = reader.collect::, _>>()?; assert!( batches.len() <= 1, @@ -108,11 +115,11 @@ fn build_fixture() -> TempDir { tmp } -#[test] -fn round_trip_emits_one_row_per_chunk_position_with_outdb_anchors() { +#[tokio::test] +async fn round_trip_emits_one_row_per_chunk_position_with_outdb_anchors() { let tmp = build_fixture(); let uri = format!("file://{}", tmp.path().display()); - let arr = read_all(&uri, None).unwrap(); + let arr = read_all(&uri, None).await.unwrap(); let rasters = RasterStructArray::new(&arr); assert_eq!(rasters.len(), 8, "expected 8 chunk rows (2*2*2)"); @@ -158,8 +165,8 @@ fn round_trip_emits_one_row_per_chunk_position_with_outdb_anchors() { assert!(anchor.contains("&chunk=1,1,1"), "got: {anchor}"); } -#[test] -fn errors_on_empty_group() { +#[tokio::test] +async fn errors_on_empty_group() { let tmp = TempDir::new().unwrap(); let store = Arc::new(FilesystemStore::new(tmp.path()).unwrap()); GroupBuilder::new() @@ -168,7 +175,7 @@ fn errors_on_empty_group() { .store_metadata() .unwrap(); let uri = format!("file://{}", tmp.path().display()); - let err = read_all(&uri, None).unwrap_err().to_string(); + let err = read_all(&uri, None).await.unwrap_err().to_string(); assert!(err.contains("no child arrays"), "got: {err}"); } @@ -212,11 +219,11 @@ fn build_xarray_style_fixture() -> TempDir { tmp } -#[test] -fn auto_skips_1d_coord_variables() { +#[tokio::test] +async fn auto_skips_1d_coord_variables() { let tmp = build_xarray_style_fixture(); let uri = format!("file://{}", tmp.path().display()); - let arr = read_all(&uri, None).unwrap(); + let arr = read_all(&uri, None).await.unwrap(); let rasters = RasterStructArray::new(&arr); // 2*2*2 = 8 chunk positions, with 2 bands per row (pressure, temperature). assert_eq!(rasters.len(), 8); @@ -224,30 +231,30 @@ fn auto_skips_1d_coord_variables() { assert_eq!(r0.num_bands(), 2); } -#[test] -fn explicit_arrays_filter_selects_subset() { +#[tokio::test] +async fn explicit_arrays_filter_selects_subset() { let tmp = build_xarray_style_fixture(); let uri = format!("file://{}", tmp.path().display()); let filter = vec!["temperature".to_string()]; - let arr = read_all(&uri, Some(&filter)).unwrap(); + let arr = read_all(&uri, Some(&filter)).await.unwrap(); let rasters = RasterStructArray::new(&arr); assert_eq!(rasters.len(), 8); let r0 = rasters.get(0).unwrap(); assert_eq!(r0.num_bands(), 1, "only temperature should be read"); } -#[test] -fn explicit_arrays_filter_rejects_unknown_name() { +#[tokio::test] +async fn explicit_arrays_filter_rejects_unknown_name() { let tmp = build_xarray_style_fixture(); let uri = format!("file://{}", tmp.path().display()); let filter = vec!["humidity".to_string()]; - let err = read_all(&uri, Some(&filter)).unwrap_err().to_string(); + let err = read_all(&uri, Some(&filter)).await.unwrap_err().to_string(); assert!(err.contains("humidity"), "got: {err}"); assert!(err.contains("no array named"), "got: {err}"); } -#[test] -fn errors_when_crs_declared_without_transform() { +#[tokio::test] +async fn errors_when_crs_declared_without_transform() { // CRS-without-transform is almost certainly malformed metadata — // the user thinks they have full georef but downstream spatial // joins would silently use the identity pixel transform. The @@ -270,27 +277,27 @@ fn errors_when_crs_declared_without_transform() { .unwrap(); let uri = format!("file://{}", tmp.path().display()); - let err = read_all(&uri, None).unwrap_err().to_string(); + let err = read_all(&uri, None).await.unwrap_err().to_string(); assert!(err.contains("CRS"), "got: {err}"); assert!(err.contains("spatial:transform"), "got: {err}"); } -#[test] -fn explicit_arrays_filter_rejects_1d_arrays() { +#[tokio::test] +async fn explicit_arrays_filter_rejects_1d_arrays() { // A user explicitly naming a 1-D array gets a clear "needs 2 dims" // error at parse time, not a confusing downstream spatial-dim // resolution failure. let tmp = build_xarray_style_fixture(); let uri = format!("file://{}", tmp.path().display()); let filter = vec!["t".to_string()]; - let err = read_all(&uri, Some(&filter)).unwrap_err().to_string(); + let err = read_all(&uri, Some(&filter)).await.unwrap_err().to_string(); assert!(err.contains("\"t\""), "got: {err}"); assert!(err.contains("rank 1"), "got: {err}"); assert!(err.contains("at least 2 dimensions"), "got: {err}"); } -#[test] -fn errors_when_group_has_only_1d_arrays() { +#[tokio::test] +async fn errors_when_group_has_only_1d_arrays() { let tmp = TempDir::new().unwrap(); let store = Arc::new(FilesystemStore::new(tmp.path()).unwrap()); GroupBuilder::new() @@ -306,12 +313,12 @@ fn errors_when_group_has_only_1d_arrays() { .unwrap(); let uri = format!("file://{}", tmp.path().display()); - let err = read_all(&uri, None).unwrap_err().to_string(); + let err = read_all(&uri, None).await.unwrap_err().to_string(); assert!(err.contains("only 1-D arrays"), "got: {err}"); } -#[test] -fn falls_back_to_array_dimensions_attribute() { +#[tokio::test] +async fn falls_back_to_array_dimensions_attribute() { // Simulates a Zarr v2 array (or any v3 array that lacks a first-class // `dimension_names` field) by leaving `.dimension_names(None)` and // setting xarray's `_ARRAY_DIMENSIONS` attribute instead. The loader @@ -334,7 +341,7 @@ fn falls_back_to_array_dimensions_attribute() { array.store_metadata().unwrap(); let uri = format!("file://{}", tmp.path().display()); - let arr = read_all(&uri, None).unwrap(); + let arr = read_all(&uri, None).await.unwrap(); let rasters = RasterStructArray::new(&arr); assert_eq!(rasters.len(), 2); let r0 = rasters.get(0).unwrap(); @@ -345,8 +352,8 @@ fn falls_back_to_array_dimensions_attribute() { assert!(band.outdb_uri().unwrap().contains("#array=temperature")); } -#[test] -fn errors_on_mismatched_chunk_grids() { +#[tokio::test] +async fn errors_on_mismatched_chunk_grids() { let tmp = TempDir::new().unwrap(); let store = Arc::new(FilesystemStore::new(tmp.path()).unwrap()); GroupBuilder::new() @@ -368,7 +375,7 @@ fn errors_on_mismatched_chunk_grids() { .unwrap(); let uri = format!("file://{}", tmp.path().display()); - let err = read_all(&uri, None).unwrap_err().to_string(); + let err = read_all(&uri, None).await.unwrap_err().to_string(); assert!( err.contains("chunk") && err.contains("array_a") && err.contains("array_b"), "got: {err}" From 2130a94883e42b4ad02489a59ceafde7fc1a3b87 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Tue, 2 Jun 2026 09:46:23 -0700 Subject: [PATCH 2/4] refactor(sedonadb-zarr): share one tokio runtime across reader opens PyZarrChunkReader::new built a fresh current-thread tokio runtime on every open to bridge the async loader constructor into the sync Python FFI. Replace it with a process-wide OnceLock so the package builds the runtime once and reuses it. next() on the reader is pure CPU and never touches it. --- python/sedonadb-zarr/src/lib.rs | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/python/sedonadb-zarr/src/lib.rs b/python/sedonadb-zarr/src/lib.rs index 8d2d9c135..2d752640a 100644 --- a/python/sedonadb-zarr/src/lib.rs +++ b/python/sedonadb-zarr/src/lib.rs @@ -20,7 +20,7 @@ //! Python-side `ZarrFormatSpec`. use std::ffi::CString; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, OnceLock}; use arrow_array::ffi_stream::FFI_ArrowArrayStream; use object_store::ObjectStore; @@ -28,9 +28,25 @@ use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::prelude::*; use pyo3::types::PyCapsule; use sedona_raster_zarr::{open_storage_from_uri, ZarrChunkReader}; -use tokio::runtime::Builder; +use tokio::runtime::{Builder, Runtime}; use url::Url; +/// Process-wide tokio runtime backing the sync Python FFI bridge. +/// +/// `ZarrChunkReader::try_new` is async, but the Python constructor is sync +/// by contract, so we `block_on` here. Building a runtime per reader is +/// wasteful; one shared runtime serves every open in the package. `next()` +/// on the returned reader is pure CPU and never touches this runtime. +fn shared_runtime() -> &'static Runtime { + static RUNTIME: OnceLock = OnceLock::new(); + RUNTIME.get_or_init(|| { + Builder::new_current_thread() + .enable_all() + .build() + .expect("build sedonadb-zarr tokio runtime") + }) +} + /// Single-use `__arrow_c_stream__` wrapper around `ZarrChunkReader`. #[pyclass] pub struct PyZarrChunkReader { @@ -47,16 +63,11 @@ impl PyZarrChunkReader { let storage = open_storage_from_uri(uri, store).map_err(|e| PyValueError::new_err(e.to_string()))?; // The crate's async-native loader exposes `try_new` as an - // `async fn`; the Python FFI is a sync constructor by - // contract, so we bridge with an ad-hoc current-thread tokio - // runtime here. `next()` on the returned reader is pure CPU - // and stays synchronous. - let runtime = Builder::new_current_thread() - .enable_all() - .build() - .map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + // `async fn`; the Python FFI is a sync constructor by contract, + // so we bridge by blocking on the shared package runtime. + // `next()` on the returned reader is pure CPU and stays synchronous. let arrays_ref = arrays.as_deref(); - let reader = runtime + let reader = shared_runtime() .block_on(ZarrChunkReader::try_new( storage, uri, arrays_ref, batch_size, )) From c0b510aa08603ffb6c8cb9118fd6015205014fdc Mon Sep 17 00:00:00 2001 From: jameswillis Date: Tue, 2 Jun 2026 09:46:23 -0700 Subject: [PATCH 3/4] chore(sedona-raster-zarr): document cloud test dataset; drop unused deps Add the NASA JPL ITS_LIVE project/data URL (https://its-live.jpl.nasa.gov/) next to the cloud smoke-test bucket constants so the dataset the #[ignore] tests hit is self-documenting. Drop the `datafusion` umbrella and `async-trait` dependencies: neither is referenced anywhere in the crate. The only datafusion use is the sedona_internal_datafusion_err! macro, which expands to datafusion_common::DataFusionError and is satisfied by the retained datafusion-common dependency. Shrinks the compile and the wheel. --- Cargo.lock | 2 -- rust/sedona-raster-zarr/Cargo.toml | 2 -- rust/sedona-raster-zarr/tests/cloud.rs | 6 +++++- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e27cb5da..c9f0586bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6162,8 +6162,6 @@ version = "0.4.0" dependencies = [ "arrow-array", "arrow-schema", - "async-trait", - "datafusion", "datafusion-common", "futures", "log", diff --git a/rust/sedona-raster-zarr/Cargo.toml b/rust/sedona-raster-zarr/Cargo.toml index b21de1d70..2c9fe7efd 100644 --- a/rust/sedona-raster-zarr/Cargo.toml +++ b/rust/sedona-raster-zarr/Cargo.toml @@ -33,8 +33,6 @@ result_large_err = "allow" [dependencies] arrow-array = { workspace = true } arrow-schema = { workspace = true } -async-trait = { workspace = true } -datafusion = { workspace = true, default_features = false } datafusion-common = { workspace = true } futures = { workspace = true } log = { workspace = true } diff --git a/rust/sedona-raster-zarr/tests/cloud.rs b/rust/sedona-raster-zarr/tests/cloud.rs index 2e75af231..19c08dd7f 100644 --- a/rust/sedona-raster-zarr/tests/cloud.rs +++ b/rust/sedona-raster-zarr/tests/cloud.rs @@ -43,7 +43,11 @@ use object_store::ObjectStore; use sedona_raster::array::RasterStructArray; use sedona_raster_zarr::{open_storage_from_uri, ZarrChunkReader}; -/// ITS_LIVE bucket layout: `s3:///` ↔ +/// NASA MEaSUREs ITS_LIVE global glacier ice-velocity datacubes — public, +/// anonymous, in `s3://its-live-data/` (us-west-2). Project and data docs: +/// . +/// +/// Bucket layout: `s3:///` ↔ /// `https://.s3.us-west-2.amazonaws.com/`. const ITS_LIVE_BUCKET: &str = "its-live-data"; const ITS_LIVE_KEY: &str = From 63ab2e54bfb1e31493de222965ed56ae8391bbc0 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Wed, 3 Jun 2026 10:54:28 -0700 Subject: [PATCH 4/4] chore(sedonadb-zarr): drop untested gcs/azure object_store openers The temporary env-var object_store bridge in the Python shim wired up s3, gcs, azure, and http openers, but only s3 and http/https have test coverage (the ITS_LIVE cloud smokes). Drop the gcs and azure match arms and their object_store `gcp`/`azure` features here and in the sedona-raster-zarr dev-dependency. The whole bridge is slated for removal once the host surfaces a credentialed store via the ObjectStore FFI capsule (#890); gcs/azure can come back with coverage if needed before then. --- python/sedonadb-zarr/Cargo.toml | 2 +- python/sedonadb-zarr/src/lib.rs | 37 +++++++++--------------------- rust/sedona-raster-zarr/Cargo.toml | 2 +- 3 files changed, 13 insertions(+), 28 deletions(-) diff --git a/python/sedonadb-zarr/Cargo.toml b/python/sedonadb-zarr/Cargo.toml index 072e652ad..e819cb1ef 100644 --- a/python/sedonadb-zarr/Cargo.toml +++ b/python/sedonadb-zarr/Cargo.toml @@ -29,7 +29,7 @@ doc = false [dependencies] arrow-array = { workspace = true, features = ["ffi"] } -object_store = { workspace = true, features = ["aws", "gcp", "azure", "http"] } +object_store = { workspace = true, features = ["aws", "http"] } pyo3 = { version = "0.25.1" } sedona-raster-zarr = { workspace = true } tokio = { workspace = true, features = ["rt"] } diff --git a/python/sedonadb-zarr/src/lib.rs b/python/sedonadb-zarr/src/lib.rs index 2d752640a..97dead268 100644 --- a/python/sedonadb-zarr/src/lib.rs +++ b/python/sedonadb-zarr/src/lib.rs @@ -104,15 +104,18 @@ impl PyZarrChunkReader { /// Build an `Arc` for `uri` using env-var credential /// discovery, per scheme. This is the temporary bridge that lets the /// Python FFI work without going through `con.read_format`; it -/// reaches into `AWS_*` / `GOOGLE_*` / `AZURE_*` environment -/// variables via the `object_store` per-backend `Builder::from_env` -/// helpers. +/// reaches into `AWS_*` environment variables via the `object_store` +/// per-backend `Builder::from_env` helpers. +/// +/// Only the schemes exercised by the crate's tests are wired up (`s3`, +/// `http`/`https`); `gcs`/`azure` are intentionally omitted until there +/// is coverage for them. /// /// **Slated for removal.** When the host wheel surfaces /// `args.src.store` to `open_reader` via the `FFI_ObjectStore` /// capsule machinery (in progress — see /// ), this function (and -/// the `object_store` `aws`/`gcp`/`azure`/`http` features that back it) +/// the `object_store` `aws`/`http` features that back it) /// gets deleted — the store will arrive credentialed from the host's /// `ObjectStoreRegistry` and `PyZarrChunkReader::new` will extract it /// from the capsule instead. For `file://` and bare paths the @@ -137,28 +140,12 @@ fn default_object_store_for_uri(uri: &str) -> Result, PyErr .map_err(|e| PyValueError::new_err(build_err("s3", uri, e)))?; Ok(Arc::new(store)) } - "gs" | "gcs" => { - use object_store::gcp::GoogleCloudStorageBuilder; - let store = GoogleCloudStorageBuilder::from_env() - .with_url(uri) - .build() - .map_err(|e| PyValueError::new_err(build_err("gcs", uri, e)))?; - Ok(Arc::new(store)) - } - "az" | "abfs" | "abfss" => { - use object_store::azure::MicrosoftAzureBuilder; - let store = MicrosoftAzureBuilder::from_env() - .with_url(uri) - .build() - .map_err(|e| PyValueError::new_err(build_err("azure", uri, e)))?; - Ok(Arc::new(store)) - } "http" | "https" => { use object_store::http::HttpBuilder; // open_storage_from_uri applies the path as a PrefixStore, // so the HttpStore must be rooted at scheme+authority only - // — unlike S3/GCS/Azure, HttpBuilder roots at whatever URL - // it's given, so hand it the authority without the path. + // — unlike S3, HttpBuilder roots at whatever URL it's given, + // so hand it the authority without the path. let authority = format!("{}://{}", url.scheme(), url.authority()); let store = HttpBuilder::new() .with_url(authority) @@ -168,7 +155,7 @@ fn default_object_store_for_uri(uri: &str) -> Result, PyErr } other => Err(PyValueError::new_err(format!( "unsupported Zarr URI scheme {other:?}; expected one of: \ - file, s3, gs, gcs, az, abfs, abfss, http, https" + file, s3, http, https" ))), } } @@ -177,9 +164,7 @@ fn build_err(backend: &str, uri: &str, err: object_store::Error) -> String { format!( "failed to build {backend} object_store for {uri}: {err}. \ Provide credentials via standard environment variables \ - (AWS_ACCESS_KEY_ID/AWS_REGION for s3, GOOGLE_SERVICE_ACCOUNT_KEY \ - for gcs, AZURE_STORAGE_ACCOUNT_NAME/AZURE_STORAGE_ACCOUNT_KEY \ - for azure)." + (e.g. AWS_ACCESS_KEY_ID/AWS_REGION for s3)." ) } diff --git a/rust/sedona-raster-zarr/Cargo.toml b/rust/sedona-raster-zarr/Cargo.toml index 2c9fe7efd..670449839 100644 --- a/rust/sedona-raster-zarr/Cargo.toml +++ b/rust/sedona-raster-zarr/Cargo.toml @@ -48,7 +48,7 @@ zarrs = { workspace = true, features = ["async", "gzip", "zstd", "blosc", "crc32 zarrs_object_store = { workspace = true } [dev-dependencies] -object_store = { workspace = true, features = ["aws", "gcp", "azure", "http"] } +object_store = { workspace = true, features = ["aws", "http"] } tempfile = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } # Test fixtures write Zarr groups to a temp dir; the loader itself