diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 516ae771d..f925c40d7 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -119,6 +119,10 @@ jobs: # Keep this export in sync with the export in dev/release/verify-release-candidate.sh export MATURIN_PEP517_ARGS="--features s2geography" pip install -e "python/sedonadb/[test]" -vv + # Unset so `--features s2geography` (sedonadb-only) doesn't + # carry into the plugin install. + unset MATURIN_PEP517_ARGS + pip install -e "python/sedonadb-zarr/[test]" -vv - name: Download minimal geoarrow-data assets run: | diff --git a/Cargo.lock b/Cargo.lock index 04c768a85..f21487435 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -553,6 +553,17 @@ dependencies = [ "abi_stable", ] +[[package]] +name = "async-lock" +version = "3.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -621,6 +632,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "auto_impl" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdcb70bdbc4d478427380519163274ac86e52916e10f0a8889adf0f96d3fee7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -1063,6 +1085,32 @@ dependencies = [ "generic-array 0.14.7", ] +[[package]] +name = "blosc-src" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9046dd58971db0226346fde214143d16a6eb12f535b5320d0ea94fcea420631" +dependencies = [ + "cc", + "libz-sys", + "lz4-sys", + "snappy_src", + "zstd-sys", +] + +[[package]] +name = "blusc" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4e0c17eaa785d2673fe58c22fc817946c2330ed47f3d9f79835d65950d32a45" +dependencies = [ + "flate2", + "lz4_flex", + "pkg-config", + "snap", + "zstd", +] + [[package]] name = "bon" version = "3.8.2" @@ -1129,6 +1177,20 @@ name = "bytemuck" version = "1.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" +dependencies = [ + "bytemuck_derive", +] + +[[package]] +name = "bytemuck_derive" +version = "1.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9abbd1bc6865053c427f7198e6af43bfdedc55ab791faed4fbd361d789575ff" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] [[package]] name = "byteorder" @@ -1369,6 +1431,15 @@ version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc14f565cf027a105f7a44ccf9e5b424348421a1d8952a8fc9d499d313107789" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-random" version = "0.1.18" @@ -1404,6 +1475,15 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d52eff69cd5e647efe296129160853a42795992097e8af39800e1060caeea9b" +[[package]] +name = "convert_case" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -1453,6 +1533,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32c" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -2398,6 +2487,29 @@ dependencies = [ "serde_core", ] +[[package]] +name = "derive_more" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d751e9e49156b02b44f9c1815bcb94b984cdcc4396ecc32521c739452808b134" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "799a97264921d8623a957f6c3b9011f3b5492f557bbb7a5a19b7fa6d06ba8dcb" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn 2.0.117", + "unicode-xid", +] + [[package]] name = "digest" version = "0.10.7" @@ -2558,6 +2670,27 @@ version = "3.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dea2df4cf52843e0452895c455a1a2cfbb842a1e7329671acf418fdc53ed4c59" +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.4.1" @@ -3019,6 +3152,7 @@ version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" dependencies = [ + "bytemuck", "cfg-if", "crunchy", "num-traits", @@ -3571,6 +3705,15 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "inventory" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4f0c30c76f2f4ccee3fe55a2435f691ca00c0e4bd87abe4f4a851b1d4dac39b" +dependencies = [ + "rustversion", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -3877,6 +4020,18 @@ dependencies = [ "zlib-rs", ] +[[package]] +name = "libz-sys" +version = "1.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc3a226e576f50782b3305c5ccf458698f92798987f551c6a02efe8276721e22" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "link-cplusplus" version = "1.0.12" @@ -3919,6 +4074,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru" +version = "0.16.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f66e8d5d03f609abc3a39e6f08e4164ebf1447a732906d39eb9b99b7919ef39" +dependencies = [ + "hashbrown 0.16.1", +] + [[package]] name = "lru" version = "0.18.0" @@ -3934,6 +4098,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "lz4_flex" version = "0.12.1" @@ -3943,6 +4117,16 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "matrixmultiply" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06de3016e9fae57a36fd14dba131fccf49f74b40b7fbdb472f96e361ec71a08" +dependencies = [ + "autocfg", + "rawpointer", +] + [[package]] name = "md-5" version = "0.10.6" @@ -4004,6 +4188,60 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "moka" +version = "0.12.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "957228ad12042ee839f93c8f257b62b4c0ab5eaae1d4fa60de53b27c9d7c5046" +dependencies = [ + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "equivalent", + "parking_lot", + "portable-atomic", + "smallvec", + "tagptr", + "uuid", +] + +[[package]] +name = "monostate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb4cc965c89dd0615a9e822ff8002f7633d2466143d51bd58693e4b2c75aabad" +dependencies = [ + "monostate-impl", + "serde", + "serde_core", +] + +[[package]] +name = "monostate-impl" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23f5b99488110875b5904839d396c2cdfaf241ff6622638acb879cc7effad5de" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "ndarray" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520080814a7a6b4a6e9070823bb24b4531daac8c4627e08ba5de8c5ef2f2752d" +dependencies = [ + "matrixmultiply", + "num-complex", + "num-integer", + "num-traits", + "portable-atomic", + "portable-atomic-util", + "rawpointer", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -4044,6 +4282,20 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -4061,6 +4313,7 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" dependencies = [ + "bytemuck", "num-traits", ] @@ -4079,6 +4332,28 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -4270,6 +4545,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -4350,6 +4631,12 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pathdiff" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3" + [[package]] name = "pdqselect" version = "0.1.0" @@ -4453,6 +4740,17 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "positioned-io" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4ec4b80060f033312b99b6874025d9503d2af87aef2dd4c516e253fbfcdada7" +dependencies = [ + "byteorder", + "libc", + "winapi", +] + [[package]] name = "potential_utf" version = "0.1.4" @@ -4646,6 +4944,18 @@ dependencies = [ "serde", ] +[[package]] +name = "quick_cache" +version = "0.6.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1c821816e9b928e20e92ed59bb3ac4aab321d16ca2316871c9fe7ca739cd477" +dependencies = [ + "ahash", + "equivalent", + "hashbrown 0.16.1", + "parking_lot", +] + [[package]] name = "quinn" version = "0.11.9" @@ -4782,6 +5092,12 @@ dependencies = [ "rand 0.10.1", ] +[[package]] +name = "rawpointer" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" + [[package]] name = "rayon" version = "1.12.0" @@ -4802,6 +5118,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rayon_iter_concurrent_limit" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d09ee01023de07fa073ce14c37cbe0a9e099c6b0b60a29cf4af6d04d9553fed7" +dependencies = [ + "rayon", +] + [[package]] name = "recursive" version = "0.1.1" @@ -5592,7 +5917,7 @@ version = "0.4.0" dependencies = [ "geo-traits", "geo-types", - "lru", + "lru 0.18.0", "rstest", "sedona-testing", "serde", @@ -5814,7 +6139,7 @@ dependencies = [ "arrow-buffer", "criterion", "datafusion-common", - "lru", + "lru 0.18.0", "sedona-common", "sedona-gdal", "sedona-raster", @@ -5824,6 +6149,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "sedona-raster-zarr" +version = "0.4.0" +dependencies = [ + "arrow-array", + "arrow-schema", + "datafusion-common", + "futures", + "log", + "sedona-common", + "sedona-raster", + "sedona-schema", + "serde", + "serde_json", + "tempfile", + "tokio", + "zarrs", + "zarrs_filesystem", +] + [[package]] name = "sedona-s2geography" version = "0.4.0" @@ -5856,7 +6201,7 @@ dependencies = [ "arrow-schema", "criterion", "datafusion-common", - "lru", + "lru 0.18.0", "sedona-common", "serde_json", ] @@ -6052,6 +6397,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "sedonadb-zarr" +version = "0.4.0" +dependencies = [ + "arrow-array", + "pyo3", + "sedona-raster-zarr", +] + [[package]] name = "sedonadbr" version = "0.4.0" @@ -6133,6 +6487,7 @@ version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ + "indexmap 2.13.0", "itoa", "memchr", "serde", @@ -6140,6 +6495,17 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -6247,6 +6613,16 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" +[[package]] +name = "snappy_src" +version = "0.2.5+snappy.1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e1432067a55bcfb1fd522d2aca6537a4fcea32bba87ea86921226d14f9bad53" +dependencies = [ + "cc", + "link-cplusplus", +] + [[package]] name = "socket2" version = "0.6.3" @@ -6406,6 +6782,12 @@ dependencies = [ "windows", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tar" version = "0.4.45" @@ -6485,6 +6867,15 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "thrift" version = "0.17.0" @@ -6804,6 +7195,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7264e107f553ccae879d21fbea1d6724ac785e8c3bfc762137959b5802826ef3" +[[package]] +name = "unsafe_cell_slice" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6659959f702dcdaad77bd6e42a9409a32ceccc06943ec93c8a4306be00eb6cf1" + [[package]] name = "untrusted" version = "0.9.0" @@ -7546,6 +7943,198 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zarrs" +version = "0.23.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "251f4ec1a9e60ca9c693ade9b29a0b981a61e68ea3e2ae85fa9da31bcdca5dbe" +dependencies = [ + "async-lock", + "base64", + "blosc-src", + "blusc", + "bytemuck", + "bytes", + "crc32c", + "derive_more", + "flate2", + "getrandom 0.3.4", + "half", + "inventory", + "itertools 0.14.0", + "itoa", + "libz-sys", + "log", + "lru 0.16.4", + "moka", + "ndarray", + "num", + "num-complex", + "paste", + "quick_cache", + "rayon", + "rayon_iter_concurrent_limit", + "serde", + "serde_json", + "thiserror 2.0.17", + "thread_local", + "unsafe_cell_slice", + "uuid", + "zarrs_chunk_grid", + "zarrs_chunk_key_encoding", + "zarrs_codec", + "zarrs_data_type", + "zarrs_filesystem", + "zarrs_metadata", + "zarrs_metadata_ext", + "zarrs_plugin", + "zarrs_storage", + "zstd", +] + +[[package]] +name = "zarrs_chunk_grid" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cf67386fd96a0336cd3e5ab5ca6cb14e0e05aee80f1acae8c4d3cf562a8bb65" +dependencies = [ + "derive_more", + "inventory", + "itertools 0.14.0", + "rayon", + "thiserror 2.0.17", + "tinyvec", + "zarrs_metadata", + "zarrs_plugin", +] + +[[package]] +name = "zarrs_chunk_key_encoding" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9040e7feaa92d1904d492acd0cd91b97214f1791c5b5738e6c05b2ca4145a382" +dependencies = [ + "derive_more", + "inventory", + "zarrs_metadata", + "zarrs_plugin", + "zarrs_storage", +] + +[[package]] +name = "zarrs_codec" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "383a129a6a0cbb2c80cdba23809e5cab85159756464b7d0f112468a495c128da" +dependencies = [ + "async-trait", + "bytemuck", + "derive_more", + "futures", + "inventory", + "itertools 0.14.0", + "rayon", + "thiserror 2.0.17", + "unsafe_cell_slice", + "zarrs_chunk_grid", + "zarrs_data_type", + "zarrs_metadata", + "zarrs_plugin", + "zarrs_storage", +] + +[[package]] +name = "zarrs_data_type" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc7c594c9363278fcd9db4c205514f009944206eb093ea7ad40b85f50009f31" +dependencies = [ + "derive_more", + "half", + "inventory", + "num", + "paste", + "serde", + "serde_json", + "thiserror 2.0.17", + "zarrs_metadata", + "zarrs_plugin", +] + +[[package]] +name = "zarrs_filesystem" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "270efeb0181651aee5460b3232f2fc83e91bd646cefe75001d1c8f9a4f3abf81" +dependencies = [ + "bytes", + "derive_more", + "itertools 0.14.0", + "libc", + "page_size", + "pathdiff", + "positioned-io", + "thiserror 2.0.17", + "walkdir", + "zarrs_storage", +] + +[[package]] +name = "zarrs_metadata" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d60c4c363a8a302d7babb3c29017850a7b4e0af6ca5f9ba2946263a185b62fea" +dependencies = [ + "derive_more", + "half", + "monostate", + "serde", + "serde_json", + "thiserror 2.0.17", +] + +[[package]] +name = "zarrs_metadata_ext" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2048e07848ca99c7450518e0584929300b1b6a3cf442f18b26ffd3520814bd5b" +dependencies = [ + "derive_more", + "monostate", + "num", + "serde", + "serde_json", + "serde_repr", + "thiserror 2.0.17", + "zarrs_metadata", +] + +[[package]] +name = "zarrs_plugin" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cbe0ed432aee86856f70ca33be36eaf4a0dae21ab730750d9280a7ca1e95046" +dependencies = [ + "paste", + "regex", + "serde_json", + "thiserror 2.0.17", +] + +[[package]] +name = "zarrs_storage" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d098796d2ed4cf94896569615101e0432e870a7665396da5cc32300fb68f7c1" +dependencies = [ + "auto_impl", + "bytes", + "derive_more", + "itertools 0.14.0", + "thiserror 2.0.17", + "unsafe_cell_slice", +] + [[package]] name = "zerocopy" version = "0.8.33" diff --git a/Cargo.toml b/Cargo.toml index 9e276783a..8e5b74c2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ members = [ "c/sedona-s2geography", "c/sedona-tg", "python/sedonadb", + "python/sedonadb-zarr", "r/sedonadb/src/rust", "rust/sedona-adbc", "rust/sedona-datasource", @@ -39,6 +40,7 @@ members = [ "rust/sedona-raster", "rust/sedona-raster-functions", "rust/sedona-raster-gdal", + "rust/sedona-raster-zarr", "rust/sedona-schema", "rust/sedona-spatial-join", "rust/sedona-spatial-join-geography", @@ -133,6 +135,8 @@ tokio = { version = "1.52", features = ["macros", "rt", "sync"] } url = "2.5.7" wkb = "0.9.2" wkt = "0.14.0" +zarrs = { version = "0.23", default-features = false } +zarrs_filesystem = "0.3" # Workspace path dependencies for internal crates sedona = { version = "0.4.0", path = "rust/sedona" } @@ -150,6 +154,7 @@ sedona-pointcloud = { version = "0.4.0", path = "rust/sedona-pointcloud" } sedona-raster = { version = "0.4.0", path = "rust/sedona-raster" } sedona-raster-functions = { version = "0.4.0", path = "rust/sedona-raster-functions" } sedona-raster-gdal = { version = "0.4.0", path = "rust/sedona-raster-gdal" } +sedona-raster-zarr = { version = "0.4.0", path = "rust/sedona-raster-zarr" } sedona-schema = { version = "0.4.0", path = "rust/sedona-schema" } sedona-spatial-join = { version = "0.4.0", path = "rust/sedona-spatial-join" } sedona-spatial-join-gpu = { version = "0.4.0", path = "rust/sedona-spatial-join-gpu" } diff --git a/python/sedonadb-zarr/.gitignore b/python/sedonadb-zarr/.gitignore new file mode 100644 index 000000000..cf0973ab4 --- /dev/null +++ b/python/sedonadb-zarr/.gitignore @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +/target + +# Byte-compiled / optimized / DLL files +__pycache__/ +.pytest_cache/ +*.py[cod] + +# C extensions +*.so + +# Distribution / packaging +.Python +.venv/ +env/ +bin/ +build/ +develop-eggs/ +dist/ +eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +include/ +man/ +venv/ +*.egg-info/ +.installed.cfg +*.egg + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt +pip-selfcheck.json + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.cache +nosetests.xml +coverage.xml + +# Translations +*.mo + +# Mr Developer +.mr.developer.cfg +.project +.pydevproject + +# Rope +.ropeproject + +# Django stuff: +*.log +*.pot + +.DS_Store + +# Sphinx documentation +docs/_build/ + +# PyCharm +.idea/ + +# VSCode +.vscode/ + +# Pyenv +.python-version + +uv.lock diff --git a/python/sedonadb-zarr/Cargo.toml b/python/sedonadb-zarr/Cargo.toml new file mode 100644 index 000000000..2a4a939c7 --- /dev/null +++ b/python/sedonadb-zarr/Cargo.toml @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "sedonadb-zarr" +version = { workspace = true } +publish = false +edition = "2021" + +[lib] +name = "_lib" +crate-type = ["cdylib"] +# Pure Python extension — no Rust API consumers, so skip rustdoc. +doc = false + +[dependencies] +arrow-array = { workspace = true, features = ["ffi"] } +pyo3 = { version = "0.25.1" } +sedona-raster-zarr = { workspace = true } diff --git a/python/sedonadb-zarr/README.md b/python/sedonadb-zarr/README.md new file mode 100644 index 000000000..5ad19b7ef --- /dev/null +++ b/python/sedonadb-zarr/README.md @@ -0,0 +1,36 @@ + + +# sedonadb-zarr + +Zarr support for [SedonaDB](https://sedona.apache.org/) as an opt-in plugin package. Reads Zarr v3 groups (with sharding, vlen-utf8 dims, etc.) as a column of N-D rasters: + +```python +import sedonadb +import sedonadb_zarr + +con = sedonadb.connect() +con.read_format(sedonadb_zarr.ZarrFormatSpec(), "file:///path/to/foo.zarr").show() +``` + +The main `sedonadb` package does not bundle Zarr support — applications that don't import `sedonadb_zarr` pay no runtime cost. + +## Architecture + +A maturin-built mixed Rust/Python package. The Rust side is a thin PyO3 shim around `sedona-raster-zarr` exposing `PyZarrChunkReader` (implementing `__arrow_c_stream__`). The Python side defines `ZarrFormatSpec(ExternalFormatSpec)`, which sedonadb consumes via `con.read_format(spec, uri)`. The same plugin shape applies to future formats (`sedonadb-cog`, `sedonadb-icechunk`, …). diff --git a/python/sedonadb-zarr/pyproject.toml b/python/sedonadb-zarr/pyproject.toml new file mode 100644 index 000000000..f78d68b0b --- /dev/null +++ b/python/sedonadb-zarr/pyproject.toml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[build-system] +requires = ["maturin>=1.8,<2.0"] +build-backend = "maturin" + +[project] +name = "sedonadb-zarr" +readme = "README.md" +requires-python = ">=3.9" +classifiers = [ + "Programming Language :: Rust", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", +] +dynamic = ["version"] +dependencies = [ + # `ExternalFormatSpec` + `list_single_object` landed in 0.4.0. + "sedonadb>=0.4.0", +] + +[project.optional-dependencies] +test = [ + "pytest", + "zarr", + "numpy", +] + +[tool.maturin] +features = ["pyo3/extension-module"] +module-name = "sedonadb_zarr._lib" +python-source = "python" diff --git a/python/sedonadb-zarr/python/sedonadb_zarr/__init__.py b/python/sedonadb-zarr/python/sedonadb_zarr/__init__.py new file mode 100644 index 000000000..653e23557 --- /dev/null +++ b/python/sedonadb-zarr/python/sedonadb_zarr/__init__.py @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Zarr support for SedonaDB. + +```python +import sedonadb +import sedonadb_zarr + +con = sedonadb.connect() +con.read_format(sedonadb_zarr.ZarrFormatSpec(), "file:///path/to/foo.zarr").show() +``` + +Importing `sedonadb_zarr` is opt-in — applications that don't import +it pay no runtime cost. +""" + +from typing import Any, Mapping, Optional + +from sedonadb.datasource import ExternalFormatSpec + +from sedonadb_zarr._lib import PyZarrChunkReader + + +class ZarrFormatSpec(ExternalFormatSpec): + """`ExternalFormatSpec` for Zarr groups. + + Use with `con.read_format(spec, uri)`: + + ```python + con.read_format(ZarrFormatSpec(), "file:///path/to/foo.zarr") + ``` + + Supported `with_options` keys: + + - `arrays` (`list[str]`) — explicit subset of group arrays to read. + """ + + _SUPPORTED_OPTIONS = frozenset({"arrays"}) + + def __init__(self, options: Optional[Mapping[str, Any]] = None): + self._options: dict = dict(options) if options else {} + + @property + def extension(self) -> str: + return ".zarr" + + @property + def list_single_object(self) -> bool: + # Zarr groups are directories; the DataFusion listing layer + # returns zero objects at a `.zarr` prefix. + return True + + def with_options(self, options: Mapping[str, Any]) -> "ZarrFormatSpec": + unknown = set(options) - self._SUPPORTED_OPTIONS + if unknown: + raise ValueError( + f"ZarrFormatSpec: unknown option(s) {sorted(unknown)!r}; " + f"supported: {sorted(self._SUPPORTED_OPTIONS)!r}" + ) + merged = {**self._options, **options} + return ZarrFormatSpec(merged) + + def open_reader(self, args: Any) -> PyZarrChunkReader: + uri = args.src.to_url() + if uri is None: + raise ValueError( + "ZarrFormatSpec: could not resolve a URL from the source object" + ) + arrays = self._options.get("arrays") + batch_size = args.batch_size if args.batch_size is not None else 8192 + return PyZarrChunkReader(uri, arrays, batch_size) + + +__all__ = ["ZarrFormatSpec", "PyZarrChunkReader"] diff --git a/python/sedonadb-zarr/src/lib.rs b/python/sedonadb-zarr/src/lib.rs new file mode 100644 index 000000000..0f70c8fa0 --- /dev/null +++ b/python/sedonadb-zarr/src/lib.rs @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! PyO3 shim around `sedona-raster-zarr`. Exposes `PyZarrChunkReader`, +//! a single-use `__arrow_c_stream__` wrapper consumed by the +//! Python-side `ZarrFormatSpec`. + +use std::ffi::CString; +use std::sync::Mutex; + +use arrow_array::ffi_stream::FFI_ArrowArrayStream; +use pyo3::exceptions::{PyRuntimeError, PyValueError}; +use pyo3::prelude::*; +use pyo3::types::PyCapsule; +use sedona_raster_zarr::ZarrChunkReader; + +/// Single-use `__arrow_c_stream__` wrapper around `ZarrChunkReader`. +#[pyclass] +pub struct PyZarrChunkReader { + inner: Mutex>, +} + +#[pymethods] +impl PyZarrChunkReader { + #[new] + #[pyo3(signature = (uri, arrays=None, batch_size=8192))] + fn new(uri: &str, arrays: Option>, batch_size: usize) -> PyResult { + let reader = ZarrChunkReader::try_new(uri, arrays.as_deref(), batch_size) + .map_err(|e| PyValueError::new_err(e.to_string()))?; + Ok(Self { + inner: Mutex::new(Some(reader)), + }) + } + + #[pyo3(signature = (requested_schema=None))] + fn __arrow_c_stream__<'py>( + &self, + py: Python<'py>, + #[allow(unused_variables)] requested_schema: Option>, + ) -> PyResult> { + let reader = self + .inner + .lock() + .map_err(|_| PyRuntimeError::new_err("PyZarrChunkReader mutex poisoned"))? + .take() + .ok_or_else(|| { + PyRuntimeError::new_err( + "PyZarrChunkReader has already been consumed; \ + a RecordBatchReader can only be exported once.", + ) + })?; + let ffi_stream = FFI_ArrowArrayStream::new(Box::new(reader)); + let capsule_name = CString::new("arrow_array_stream") + .map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + PyCapsule::new(py, ffi_stream, Some(capsule_name)) + } +} + +#[pymodule] +fn _lib(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + Ok(()) +} diff --git a/python/sedonadb-zarr/tests/test_zarr.py b/python/sedonadb-zarr/tests/test_zarr.py new file mode 100644 index 000000000..48a510483 --- /dev/null +++ b/python/sedonadb-zarr/tests/test_zarr.py @@ -0,0 +1,116 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Tests for the `sedonadb-zarr` plugin. + +Plugin surface: `ZarrFormatSpec(ExternalFormatSpec)` paired with +`con.read_format(spec, uri)`. The SQL UDTF form (`sd_read_zarr`) is +deferred to a follow-up PR. +""" + +import numpy as np +import pytest + +import sedonadb +import sedonadb_zarr + + +@pytest.fixture +def zarr_group(tmp_path): + """Build a tiny 2x2 UInt8 Zarr v3 group with two chunks.""" + zarr = pytest.importorskip("zarr") + root = zarr.open_group(str(tmp_path), mode="w") + arr = root.create_array( + "temperature", + shape=(2, 2), + chunks=(1, 2), + dtype="uint8", + dimension_names=["y", "x"], + ) + arr[:] = np.array([[10, 11], [20, 21]], dtype=np.uint8) + return tmp_path + + +def test_format_spec_via_read_format(zarr_group): + con = sedonadb.connect() + df = con.read_format(sedonadb_zarr.ZarrFormatSpec(), f"file://{zarr_group}") + arrow_tab = df.to_arrow_table() + assert arrow_tab.num_rows == 2 + assert arrow_tab.column_names == ["raster"] + + raster = arrow_tab["raster"][0].as_py() + assert isinstance(raster, dict), f"raster row is {type(raster).__name__}" + for field in ("transform", "bands"): + assert field in raster, f"raster row missing {field!r}: {sorted(raster)}" + assert isinstance(raster["bands"], list) and len(raster["bands"]) >= 1 + band = raster["bands"][0] + # `data` is empty (OutDb scan); `outdb_uri` points at this chunk. + assert band.get("data") in (None, b"", bytes()), ( + f"OutDb band should have empty data; got {band.get('data')!r}" + ) + anchor = band.get("outdb_uri") + assert anchor and "#array=temperature" in anchor, f"unexpected anchor: {anchor!r}" + + +def test_format_spec_with_arrays_option(zarr_group): + con = sedonadb.connect() + spec = sedonadb_zarr.ZarrFormatSpec().with_options({"arrays": ["temperature"]}) + df = con.read_format(spec, f"file://{zarr_group}") + assert df.to_arrow_table().num_rows == 2 + + +def test_format_spec_class_invariants(): + spec = sedonadb_zarr.ZarrFormatSpec() + assert spec.extension == ".zarr" + spec2 = spec.with_options({"arrays": ["temperature"]}) + assert spec2 is not spec + assert spec2._options.get("arrays") == ["temperature"] + + +# Each numpy dtype below maps to a different `BandDataType` arm in +# `rust/sedona-raster-zarr/src/dtype.rs::zarr_to_band_data_type`. +@pytest.mark.parametrize( + "numpy_dtype", + [ + "bool", + "int8", + "uint8", + "int16", + "uint16", + "int32", + "uint32", + "int64", + "uint64", + "float32", + "float64", + ], +) +def test_dtype_mapping_roundtrips(tmp_path, numpy_dtype): + zarr = pytest.importorskip("zarr") + root = zarr.open_group(str(tmp_path), mode="w") + arr = root.create_array( + "temperature", + shape=(2, 2), + chunks=(1, 2), + dtype=numpy_dtype, + dimension_names=["y", "x"], + ) + arr[:] = np.ones((2, 2), dtype=numpy_dtype) + + con = sedonadb.connect() + df = con.read_format(sedonadb_zarr.ZarrFormatSpec(), f"file://{tmp_path}") + assert df.to_arrow_table().num_rows == 2 diff --git a/python/sedonadb/Cargo.toml b/python/sedonadb/Cargo.toml index 238dfe2ea..297aff412 100644 --- a/python/sedonadb/Cargo.toml +++ b/python/sedonadb/Cargo.toml @@ -24,6 +24,8 @@ edition = "2021" [lib] name = "_lib" crate-type = ["cdylib"] +# Pure Python extension — no Rust API consumers, so skip rustdoc. +doc = false [features] default = ["mimalloc"] diff --git a/python/sedonadb/python/sedonadb/context.py b/python/sedonadb/python/sedonadb/context.py index 61842e77a..dda79c724 100644 --- a/python/sedonadb/python/sedonadb/context.py +++ b/python/sedonadb/python/sedonadb/context.py @@ -20,7 +20,20 @@ import sys from functools import cached_property from pathlib import Path -from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple, Union +from typing import ( + Any, + Dict, + Iterable, + List, + Literal, + Optional, + TYPE_CHECKING, + Tuple, + Union, +) + +if TYPE_CHECKING: + from sedonadb.datasource import ExternalFormatSpec from sedonadb._lib import ( InternalContext, @@ -333,6 +346,50 @@ def read_pyogrio( self.options, ) + def read_format( + self, + spec: "ExternalFormatSpec", + table_paths: Union[str, Path, Iterable[str]], + check_extension: bool = False, + ) -> DataFrame: + """Read one or more paths using a Python-defined `ExternalFormatSpec`. + + This is the plugin entry point: a format-specific package (e.g. + `sedonadb-zarr`) defines an `ExternalFormatSpec` subclass and the + user reads through it via this method. Built-in formats have + their own dedicated readers (`read_parquet`, `read_pyogrio`). + + Format-specific options are passed via the spec itself using + `spec.with_options({...})`, which returns a configured copy. + Unlike `read_pyogrio`, this method has no `options=` keyword — + each spec class documents its own supported keys. + + Args: + spec: An `ExternalFormatSpec` instance describing how to open + the underlying source. + table_paths: A str, Path, or iterable of paths/URLs. + check_extension: When `True`, error if a non-collection path + doesn't end in the spec's `extension`. Defaults to `False`. + + Examples: + >>> import sedonadb_zarr # doctest: +SKIP + >>> sd = sedona.db.connect() + >>> spec = sedonadb_zarr.ZarrFormatSpec().with_options( # doctest: +SKIP + ... {"arrays": ["temperature"]} + ... ) + >>> sd.read_format(spec, "file:///path/to/foo.zarr").show() # doctest: +SKIP + """ + if isinstance(table_paths, (str, Path)): + table_paths = [table_paths] + + return DataFrame( + self._impl, + self._impl.read_external_format( + spec, [str(path) for path in table_paths], check_extension + ), + self.options, + ) + def sql( self, sql: str, *, params: Union[List, Tuple, Dict, None] = None ) -> DataFrame: diff --git a/python/sedonadb/python/sedonadb/datasource.py b/python/sedonadb/python/sedonadb/datasource.py index dd7a24ab6..17c5ebdba 100644 --- a/python/sedonadb/python/sedonadb/datasource.py +++ b/python/sedonadb/python/sedonadb/datasource.py @@ -50,6 +50,20 @@ def extension(self): """ return "" + @property + def list_single_object(self) -> bool: + """Whether each URI should be treated as a single opaque object + + Default ``False``: the URI is fed through DataFusion's listing layer, + which enumerates files matching :attr:`extension` at the prefix. + + Override to ``True`` for directory-shaped formats (e.g. a Zarr group is + a directory, not a file) where the listing layer would return zero + matching files. The URI is then passed straight to + :meth:`open_reader` as one object. + """ + return False + def with_options(self, options: Mapping[str, Any]): """Clone this instance and return a new instance with options applied diff --git a/python/sedonadb/src/datasource.rs b/python/sedonadb/src/datasource.rs index e746f36f9..f3c254f27 100644 --- a/python/sedonadb/src/datasource.rs +++ b/python/sedonadb/src/datasource.rs @@ -47,6 +47,11 @@ use crate::{ #[derive(Debug)] pub struct PyExternalFormat { extension: String, + /// Cached at construction time. The Python side declares this via + /// the `list_single_object` attribute on the spec class (default + /// `False`); we snapshot it once to avoid GIL traffic in + /// `list_single_object()`, which is called on hot paths. + list_single_object: bool, py_spec: PyObject, } @@ -54,6 +59,7 @@ impl Clone for PyExternalFormat { fn clone(&self) -> Self { Python::with_gil(|py| Self { extension: self.extension.clone(), + list_single_object: self.list_single_object, py_spec: self.py_spec.clone_ref(py), }) } @@ -71,8 +77,10 @@ impl PyExternalFormat { let new_extension = new_py_spec .getattr(py, "extension")? .extract::(py)?; + let new_list_single_object = read_list_single_object(py, &new_py_spec)?; Ok(Self { extension: new_extension, + list_single_object: new_list_single_object, py_spec: new_py_spec, }) } @@ -143,16 +151,40 @@ impl PyExternalFormat { #[new] fn new<'py>(py: Python<'py>, py_spec: PyObject) -> Result { let extension = py_spec.getattr(py, "extension")?.extract::(py)?; - Ok(Self { extension, py_spec }) + let list_single_object = read_list_single_object(py, &py_spec)?; + Ok(Self { + extension, + list_single_object, + py_spec, + }) } } +/// Read the `list_single_object` attribute on a Python spec. +/// +/// Defined on the [`ExternalFormatSpec`] base class (defaults to +/// `False`), so any spec inheriting from the base carries it. A +/// duck-typed spec that doesn't inherit will raise `AttributeError` +/// — that's the intended failure mode. +fn read_list_single_object<'py>( + py: Python<'py>, + py_spec: &PyObject, +) -> Result { + Ok(py_spec + .getattr(py, "list_single_object")? + .extract::(py)?) +} + #[async_trait] impl ExternalFormatSpec for PyExternalFormat { fn extension(&self) -> &str { &self.extension } + fn list_single_object(&self) -> bool { + self.list_single_object + } + fn with_options( &self, options: &HashMap, @@ -180,7 +212,7 @@ impl ExternalFormatSpec for PyExternalFormat { } } -/// Wrapper around the [Object] such that the [PyExternalFormatSpec] can pass +/// Wrapper around the [Object] such that the [PyExternalFormat] can pass /// required information into Python method calls /// /// Currently this only exposes `to_url()`; however, we can and should expose @@ -198,7 +230,7 @@ impl PyDataSourceObject { } } -/// Wrapper around the [OpenReaderArgs] such that the [PyExternalFormatSpec] can pass +/// Wrapper around the [OpenReaderArgs] such that the [PyExternalFormat] can pass /// required information into Python method calls #[pyclass] #[derive(Clone, Debug)] @@ -267,7 +299,7 @@ impl PyOpenReaderArgs { } } -/// Wrapper around a PhysicalExpr such that the [PyExternalFormatSpec] can pass +/// Wrapper around a PhysicalExpr such that the [PyExternalFormat] can pass /// required information into Python method calls /// /// This currently only exposes `bounding_box()`, but in the future could expose diff --git a/python/sedonadb/src/lib.rs b/python/sedonadb/src/lib.rs index 6c1437b18..cff5231a3 100644 --- a/python/sedonadb/src/lib.rs +++ b/python/sedonadb/src/lib.rs @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + use crate::{error::PySedonaError, udf::sedona_scalar_udf}; use pyo3::{ffi::Py_uintptr_t, prelude::*}; use sedona_adbc::AdbcSedonadbDriverInit; diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index 20c158f2b..9df4a4251 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -409,7 +409,7 @@ mod test { use tempfile::TempDir; use url::Url; - use crate::provider::external_listing_table; + use crate::provider::external_table; use super::*; @@ -613,7 +613,7 @@ mod test { let (_temp_dir, files) = create_echo_spec_temp_dir(); // Select using a listing table and ensure we get a result - let provider = external_listing_table( + let provider = external_table( spec, &ctx, files @@ -625,12 +625,7 @@ mod test { .await .unwrap(); - let batches = ctx - .read_table(Arc::new(provider)) - .unwrap() - .collect() - .await - .unwrap(); + let batches = ctx.read_table(provider).unwrap().collect().await.unwrap(); // We should get one value per partition assert_eq!(batches.len(), 2); @@ -648,7 +643,7 @@ mod test { let (_temp_dir, files) = create_echo_spec_temp_dir(); // Select using a listing table and ensure we get a result with the option passed - let provider = external_listing_table( + let provider = external_table( spec, &ctx, files @@ -661,7 +656,7 @@ mod test { .unwrap(); let batches = ctx - .read_table(Arc::new(provider)) + .read_table(provider) .unwrap() .select(vec![col("batch_size"), col("option_value")]) .unwrap() @@ -691,7 +686,7 @@ mod test { let (temp_dir, mut files) = create_echo_spec_temp_dir(); // Listing table with no files should error - let err = external_listing_table(spec.clone(), &ctx, vec![], true) + let err = external_table(spec.clone(), &ctx, vec![], true) .await .unwrap_err(); assert_eq!(err.message(), "No table paths were provided"); @@ -705,7 +700,7 @@ mod test { files.push(file2); // With check_extension as true we should get an error - let err = external_listing_table( + let err = external_table( spec.clone(), &ctx, files @@ -722,7 +717,7 @@ mod test { .ends_with("does not match the expected extension 'echospec'")); // ...but we should be able to turn off the error - external_listing_table( + external_table( spec, &ctx, files @@ -734,4 +729,121 @@ mod test { .await .unwrap(); } + + /// Spec for a directory-shaped format whose "object" is the + /// directory itself. Used to exercise the + /// [`SingleObjectExternalTable`] path through + /// [`external_table`]. + #[derive(Debug, Default, Clone)] + struct DirectorySpec; + + #[async_trait] + impl ExternalFormatSpec for DirectorySpec { + fn extension(&self) -> &str { + ".dirfmt" + } + + fn list_single_object(&self) -> bool { + true + } + + fn with_options( + &self, + _options: &HashMap, + ) -> Result> { + Ok(Arc::new(self.clone())) + } + + async fn infer_schema(&self, location: &Object) -> Result { + // The single-object provider must synthesise an ObjectMeta + // before calling us; assert that contract here. + assert!( + location.meta.is_some(), + "single-object scan must synthesise an ObjectMeta", + ); + Ok(Schema::new(vec![ + Field::new("uri_path", DataType::Utf8, false), + Field::new("row_idx", DataType::Int32, false), + ])) + } + + async fn open_reader( + &self, + args: &OpenReaderArgs, + ) -> Result> { + let meta = args + .src + .meta + .as_ref() + .expect("single-object scan must synthesise an ObjectMeta"); + let path = meta.location.to_string(); + let schema = Arc::new(self.infer_schema(&args.src).await?); + let mut batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec![path])), + Arc::new(Int32Array::from(vec![0])), + ], + )?; + if let Some(projection) = &args.file_projection { + batch = batch.project(projection)?; + } + Ok(Box::new(RecordBatchIterator::new([Ok(batch)], schema))) + } + } + + #[tokio::test] + async fn single_object_table_skips_listing() { + // The fixture dir is *not* a `.dirfmt` directory and contains + // nothing matching that extension. A listing-based provider + // would return zero objects and error on schema inference. + let spec = Arc::new(DirectorySpec); + let temp_dir = TempDir::new().unwrap(); + let dir_path = temp_dir.path().join("group.dirfmt"); + std::fs::create_dir(&dir_path).unwrap(); + // Make the directory non-empty so it looks like a real + // directory-shaped artefact, not just a missing entry. + std::fs::File::create(dir_path.join("metadata.json")) + .unwrap() + .write_all(b"{}") + .unwrap(); + + let ctx = SessionContext::new(); + let url = ListingTableUrl::parse(dir_path.to_string_lossy()).unwrap(); + let provider = external_table(spec, &ctx, vec![url], false).await.unwrap(); + + let batches = ctx.read_table(provider).unwrap().collect().await.unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 1); + // The synthesised ObjectMeta::location is the URL path within + // the object store — non-empty means we passed the URI through + // without trying to list inside it. + let path_col = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!path_col.value(0).is_empty()); + assert!(path_col.value(0).ends_with("group.dirfmt")); + } + + #[tokio::test] + async fn single_object_table_rejects_mixed_object_stores() { + let spec = Arc::new(DirectorySpec); + let ctx = SessionContext::new(); + // Mix file:// + https:// — both parse fine but resolve to + // different ObjectStoreUrls, which the single-object provider + // doesn't try to span. + let url_a = ListingTableUrl::parse("file:///tmp/a.dirfmt").unwrap(); + let url_b = ListingTableUrl::parse("https://example.com/b.dirfmt").unwrap(); + let err = external_table(spec, &ctx, vec![url_a, url_b], false) + .await + .unwrap_err(); + assert!( + err.message().contains("same object store"), + "unexpected error: {}", + err.message() + ); + } } diff --git a/rust/sedona-datasource/src/provider.rs b/rust/sedona-datasource/src/provider.rs index 6b06a70d1..1caf699a6 100644 --- a/rust/sedona-datasource/src/provider.rs +++ b/rust/sedona-datasource/src/provider.rs @@ -15,25 +15,68 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; use std::sync::Arc; use arrow_schema::SchemaRef; use async_trait::async_trait; use datafusion::{ + catalog::TableProvider, config::TableOptions, - datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, + datasource::{ + file_format::FileFormat, + listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, + physical_plan::FileScanConfig, + TableType, + }, execution::{options::ReadOptions, SessionState}, + logical_expr::Expr, + physical_plan::ExecutionPlan, prelude::{SessionConfig, SessionContext}, }; +use datafusion_catalog::{memory::DataSourceExec, Session}; use datafusion_common::{exec_err, Result}; +use datafusion_datasource::{ + file_groups::FileGroup, file_scan_config::FileScanConfigBuilder, table_schema::TableSchema, + PartitionedFile, +}; +use datafusion_execution::object_store::ObjectStoreUrl; +use object_store::{path::Path as ObjectPath, ObjectMeta}; -use crate::{format::ExternalFileFormat, spec::ExternalFormatSpec}; +use crate::{ + format::ExternalFileFormat, + spec::{ExternalFormatSpec, Object}, +}; -/// Create a [ListingTable] from an [ExternalFormatSpec] and one or more URLs +/// Resolve an [ExternalFormatSpec] + URLs into a [TableProvider]. /// -/// This can be used to resolve a format specification into a TableProvider that -/// may be registered with a [SessionContext]. -pub async fn external_listing_table( +/// Dispatches on [`ExternalFormatSpec::list_single_object`]: +/// - `false` (default): builds a [`ListingTable`] that lists files at +/// the URL prefix matching the spec's extension. Best for formats +/// whose unit of work is a single file (Parquet, FlatGeobuf, ...). +/// - `true`: builds a [`SingleObjectExternalTable`] that treats each +/// URI as one opaque object, skipping listing entirely. Required +/// for directory-shaped formats like Zarr. +pub async fn external_table( + spec: Arc, + context: &SessionContext, + table_paths: Vec, + check_extension: bool, +) -> Result> { + if table_paths.is_empty() { + return exec_err!("No table paths were provided"); + } + + if spec.list_single_object() { + let provider = SingleObjectExternalTable::try_new(spec, context, table_paths).await?; + Ok(Arc::new(provider) as Arc) + } else { + let provider = listing_table_provider(spec, context, table_paths, check_extension).await?; + Ok(Arc::new(provider) as Arc) + } +} + +async fn listing_table_provider( spec: Arc, context: &SessionContext, table_paths: Vec, @@ -49,10 +92,6 @@ pub async fn external_listing_table( let option_extension = listing_options.file_extension.clone(); - if table_paths.is_empty() { - return exec_err!("No table paths were provided"); - } - // check if the file extension matches the expected extension if one is provided if !option_extension.is_empty() && options.check_extension { for path in &table_paths { @@ -110,3 +149,131 @@ impl ReadOptions<'_> for RecordBatchReaderTableOptions { .await } } + +/// [`TableProvider`] that treats each input URI as one opaque object, +/// bypassing the listing layer. Built when +/// [`ExternalFormatSpec::list_single_object`] is `true` — required for +/// directory-shaped formats like Zarr. Each URI lands in its own +/// [`FileGroup`] so multi-URI scans can fan out across partitions. +#[derive(Debug)] +pub struct SingleObjectExternalTable { + spec: Arc, + schema: SchemaRef, + /// `(scheme-level store URL, path within store)`. Mixed-store + /// inputs are rejected in `try_new`. + files: Vec<(ObjectStoreUrl, ObjectPath)>, +} + +impl SingleObjectExternalTable { + async fn try_new( + spec: Arc, + context: &SessionContext, + table_paths: Vec, + ) -> Result { + let first_store = table_paths[0].object_store(); + for path in table_paths.iter().skip(1) { + let store = path.object_store(); + if store != first_store { + let first_uri = table_paths[0].as_str(); + let bad_uri = path.as_str(); + return exec_err!( + "all URIs must share the same object store; got '{first_uri}' \ + (store '{first_store}') and '{bad_uri}' (store '{store}')" + ); + } + } + + let files: Vec<(ObjectStoreUrl, ObjectPath)> = table_paths + .iter() + .map(|p| (p.object_store(), p.prefix().clone())) + .collect(); + + let store = context.runtime_env().object_store(&first_store)?; + let probe = Object { + store: Some(store), + url: Some(first_store.clone()), + meta: Some(synthetic_object_meta(&files[0].1)), + range: None, + }; + let schema = Arc::new(spec.infer_schema(&probe).await?); + + Ok(Self { + spec, + schema, + files, + }) + } +} + +#[async_trait] +impl TableProvider for SingleObjectExternalTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + limit: Option, + ) -> Result> { + // `_filters` is dropped: no `supports_filters_pushdown` override + // means DataFusion won't claim our scan handles them, and the + // filter node above us applies them itself. Spatial-bbox + // pushdown (`PyFilter::bounding_box`) therefore doesn't fire + // for single-object formats today. + let (object_store_url, _) = &self.files[0]; + + let table_schema = TableSchema::new(self.schema.clone(), vec![]); + let format = ExternalFileFormat::new(self.spec.clone()); + let file_source = format.file_source(table_schema); + + // One FileGroup per URI so multi-URI scans can fan out across + // partitions instead of being pinned to one. + let file_groups: Vec = self + .files + .iter() + .map(|(_, location)| { + FileGroup::new(vec![PartitionedFile { + object_meta: synthetic_object_meta(location), + partition_values: vec![], + range: None, + extensions: None, + statistics: None, + metadata_size_hint: None, + }]) + }) + .collect(); + + let mut builder = FileScanConfigBuilder::new(object_store_url.clone(), file_source) + .with_file_groups(file_groups) + .with_limit(limit); + if let Some(indices) = projection { + builder = builder.with_projection_indices(Some(indices.clone()))?; + } + let config: FileScanConfig = builder.build(); + Ok(DataSourceExec::from_data_source(config)) + } +} + +/// Synthesise an [`ObjectMeta`] without statting. `size = u64::MAX` +/// rather than `0` so DataFusion's size-aware pruning doesn't treat +/// the entry as an empty file and skip it. +fn synthetic_object_meta(location: &ObjectPath) -> ObjectMeta { + ObjectMeta { + location: location.clone(), + last_modified: Default::default(), + size: u64::MAX, + e_tag: None, + version: None, + } +} diff --git a/rust/sedona-datasource/src/spec.rs b/rust/sedona-datasource/src/spec.rs index 2c2ca31a3..e82c921ad 100644 --- a/rust/sedona-datasource/src/spec.rs +++ b/rust/sedona-datasource/src/spec.rs @@ -70,6 +70,24 @@ pub trait ExternalFormatSpec: Debug + Send + Sync { "" } + /// Whether each URI should be treated as a single opaque object + /// (no `ObjectStore` listing or extension matching). + /// + /// Default `false`: the URI is fed through DataFusion's + /// `ListingTableUrl` and the listing fan-out enumerates matching + /// files at the prefix. + /// + /// Set to `true` for directory-shaped formats like Zarr where the + /// "object" is the directory itself, not the files within it. With + /// `true`, [`external_table`](crate::provider::external_table) + /// skips listing and passes each URI directly to + /// [`ExternalFormatSpec::infer_schema`] and + /// [`ExternalFormatSpec::open_reader`] as one + /// [`Object`]. + fn list_single_object(&self) -> bool { + false + } + /// Fill in default options from [TableOptions] /// /// The TableOptions are a DataFusion concept that provide a means by which diff --git a/rust/sedona-raster-zarr/Cargo.toml b/rust/sedona-raster-zarr/Cargo.toml new file mode 100644 index 000000000..93e946812 --- /dev/null +++ b/rust/sedona-raster-zarr/Cargo.toml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +[package] +name = "sedona-raster-zarr" +version.workspace = true +license.workspace = true +keywords.workspace = true +categories.workspace = true +homepage.workspace = true +repository.workspace = true +description.workspace = true +readme.workspace = true +edition.workspace = true +rust-version.workspace = true + +[lints.clippy] +result_large_err = "allow" + +[dependencies] +arrow-array = { workspace = true } +arrow-schema = { workspace = true } +datafusion-common = { workspace = true } +futures = { workspace = true } +log = { workspace = true } +sedona-common = { workspace = true } +sedona-raster = { workspace = true } +sedona-schema = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +zarrs = { workspace = true, features = ["filesystem", "gzip", "zstd", "blosc", "crc32c", "sharding", "transpose"] } +zarrs_filesystem = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/rust/sedona-raster-zarr/src/dtype.rs b/rust/sedona-raster-zarr/src/dtype.rs new file mode 100644 index 000000000..65d234d7b --- /dev/null +++ b/rust/sedona-raster-zarr/src/dtype.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Mapping between Zarr datatypes and SedonaDB's `BandDataType`. +//! +//! zarrs 0.23 models `DataType` as a wrapper around `Arc`, +//! so we discriminate via the type-erased `is::()` check rather than +//! pattern-matching an enum. + +use arrow_schema::ArrowError; +use sedona_schema::raster::BandDataType; +use zarrs::array::data_type::{ + BoolDataType, Float32DataType, Float64DataType, Int16DataType, Int32DataType, Int64DataType, + Int8DataType, UInt16DataType, UInt32DataType, UInt64DataType, UInt8DataType, +}; +use zarrs::array::DataType as ZarrDataType; + +/// Map a Zarr `DataType` to a SedonaDB `BandDataType`. +/// +/// Bool maps to UInt8 losslessly (Zarr packs bools to one byte each, matching +/// our representation). Variable-length, complex, and extended-precision +/// types error with `NotYetImplemented` — they have no numeric counterpart +/// in `BandDataType` today. +pub fn zarr_to_band_data_type(dt: &ZarrDataType) -> Result { + if dt.is::() { + Ok(BandDataType::UInt8) + } else if dt.is::() { + Ok(BandDataType::Int8) + } else if dt.is::() { + Ok(BandDataType::UInt8) + } else if dt.is::() { + Ok(BandDataType::Int16) + } else if dt.is::() { + Ok(BandDataType::UInt16) + } else if dt.is::() { + Ok(BandDataType::Int32) + } else if dt.is::() { + Ok(BandDataType::UInt32) + } else if dt.is::() { + Ok(BandDataType::Int64) + } else if dt.is::() { + Ok(BandDataType::UInt64) + } else if dt.is::() { + Ok(BandDataType::Float32) + } else if dt.is::() { + Ok(BandDataType::Float64) + } else { + Err(ArrowError::NotYetImplemented(format!( + "Zarr datatype {dt:?} has no BandDataType mapping yet" + ))) + } +} diff --git a/rust/sedona-raster-zarr/src/geozarr.rs b/rust/sedona-raster-zarr/src/geozarr.rs new file mode 100644 index 000000000..63c0eb8f0 --- /dev/null +++ b/rust/sedona-raster-zarr/src/geozarr.rs @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! GeoZarr attribute parsing for CRS and affine transform. +//! +//! Reads the `proj:*` (CRS) and `spatial:*` (transform / spatial dim +//! mapping) attribute conventions from a Zarr group's attributes, mapping +//! them onto SedonaDB's per-raster `crs` and `transform` fields. +//! +//! Attributes live at the group level and are inherited by every array. +//! Per-array overrides are rejected by the group-constraint validator +//! (see `loader`). + +use arrow_schema::ArrowError; + +/// Per-group geo metadata distilled from `proj:*` / `spatial:*` attributes. +#[derive(Debug, Clone, PartialEq)] +pub struct GroupGeoMetadata { + /// CRS string in PROJ or WKT format (whichever the group declared). + /// `None` if no `proj:wkt2` / `proj:projjson` / `proj:epsg` attribute + /// is present on the group. + pub crs: Option, + /// Affine transform in GDAL GeoTransform order: + /// `[origin_x, scale_x, skew_x, origin_y, skew_y, scale_y]`. `None` + /// when no `spatial:transform` attribute is present. + pub transform: Option<[f64; 6]>, + /// Names of the spatial dimensions in the order the group declares + /// them (typically `["y", "x"]`). `None` falls back to a 2-D default + /// at construction time in the loader. + pub spatial_dims: Option>, +} + +impl GroupGeoMetadata { + /// Parse the group-level attributes object (the raw JSON map zarrs + /// surfaces from a group) into a `GroupGeoMetadata`. + /// + /// Returns `Ok(default-empty)` when none of the conventional keys are + /// present — geospatial metadata is optional; downstream fall-backs in + /// the loader provide identity transforms when needed. + pub fn from_attributes( + attrs: &serde_json::Map, + ) -> Result { + let crs = parse_crs(attrs)?; + let transform = parse_transform(attrs)?; + let spatial_dims = parse_spatial_dims(attrs)?; + Ok(Self { + crs, + transform, + spatial_dims, + }) + } +} + +fn parse_crs( + attrs: &serde_json::Map, +) -> Result, ArrowError> { + // GeoZarr `proj:` precedence — wkt2 wins over projjson wins over epsg + // code. Match how downstream tools (e.g. xarray + rioxarray) resolve + // multi-attribute groups: more specific representations override + // numeric codes. + if let Some(v) = attrs.get("proj:wkt2") { + return Ok(Some(json_value_to_string(v, "proj:wkt2")?)); + } + if let Some(v) = attrs.get("proj:projjson") { + return Ok(Some(json_value_to_string(v, "proj:projjson")?)); + } + if let Some(v) = attrs.get("proj:epsg") { + let code = v.as_i64().ok_or_else(|| { + ArrowError::InvalidArgumentError("proj:epsg attribute must be an integer".into()) + })?; + return Ok(Some(format!("EPSG:{code}"))); + } + Ok(None) +} + +fn parse_transform( + attrs: &serde_json::Map, +) -> Result, ArrowError> { + let Some(t) = attrs.get("spatial:transform") else { + return Ok(None); + }; + let arr = t.as_array().ok_or_else(|| { + ArrowError::InvalidArgumentError("spatial:transform attribute must be a JSON array".into()) + })?; + if arr.len() != 6 { + return Err(ArrowError::InvalidArgumentError(format!( + "spatial:transform must have 6 elements (GDAL GeoTransform order); got {}", + arr.len() + ))); + } + let mut out = [0f64; 6]; + for (i, v) in arr.iter().enumerate() { + out[i] = v.as_f64().ok_or_else(|| { + ArrowError::InvalidArgumentError(format!("spatial:transform[{i}] must be a number")) + })?; + } + Ok(Some(out)) +} + +fn parse_spatial_dims( + attrs: &serde_json::Map, +) -> Result>, ArrowError> { + let Some(v) = attrs.get("spatial:dims") else { + return Ok(None); + }; + let arr = v.as_array().ok_or_else(|| { + ArrowError::InvalidArgumentError( + "spatial:dims attribute must be a JSON array of strings".into(), + ) + })?; + let dims = arr + .iter() + .map(|e| { + e.as_str().map(String::from).ok_or_else(|| { + ArrowError::InvalidArgumentError("spatial:dims entries must be strings".into()) + }) + }) + .collect::, _>>()?; + Ok(Some(dims)) +} + +fn json_value_to_string(v: &serde_json::Value, attr_name: &str) -> Result { + if let Some(s) = v.as_str() { + return Ok(s.to_string()); + } + if v.is_object() { + return Ok(v.to_string()); + } + Err(ArrowError::InvalidArgumentError(format!( + "{attr_name} attribute must be a string or JSON object" + ))) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn map(json: serde_json::Value) -> serde_json::Map { + json.as_object().unwrap().clone() + } + + #[test] + fn empty_attrs_parses_to_all_none() { + let g = GroupGeoMetadata::from_attributes(&map(json!({}))).unwrap(); + assert!(g.crs.is_none()); + assert!(g.transform.is_none()); + assert!(g.spatial_dims.is_none()); + } + + #[test] + fn epsg_code_parses_to_epsg_string() { + let g = GroupGeoMetadata::from_attributes(&map(json!({"proj:epsg": 4326}))).unwrap(); + assert_eq!(g.crs.as_deref(), Some("EPSG:4326")); + } + + #[test] + fn wkt2_takes_precedence_over_epsg() { + let g = GroupGeoMetadata::from_attributes(&map(json!({ + "proj:epsg": 4326, + "proj:wkt2": "GEOGCRS[\"WGS 84\", ...]" + }))) + .unwrap(); + assert!(g.crs.as_deref().unwrap().starts_with("GEOGCRS")); + } + + #[test] + fn projjson_object_serialises_to_string() { + let g = GroupGeoMetadata::from_attributes(&map(json!({ + "proj:projjson": {"type": "GeographicCRS"} + }))) + .unwrap(); + let crs = g.crs.unwrap(); + assert!(crs.contains("GeographicCRS")); + } + + #[test] + fn transform_parses_six_floats() { + let g = GroupGeoMetadata::from_attributes(&map(json!({ + "spatial:transform": [0.0, 1.0, 0.0, 0.0, 0.0, -1.0] + }))) + .unwrap(); + assert_eq!(g.transform, Some([0.0, 1.0, 0.0, 0.0, 0.0, -1.0])); + } + + #[test] + fn transform_wrong_length_errors() { + let err = GroupGeoMetadata::from_attributes(&map(json!({ + "spatial:transform": [0.0, 1.0, 0.0] + }))) + .unwrap_err() + .to_string(); + assert!(err.contains("6 elements"), "{err}"); + } + + #[test] + fn spatial_dims_parses_string_list() { + let g = GroupGeoMetadata::from_attributes(&map(json!({ + "spatial:dims": ["y", "x"] + }))) + .unwrap(); + assert_eq!(g.spatial_dims, Some(vec!["y".to_string(), "x".to_string()])); + } + + #[test] + fn spatial_dims_non_string_errors() { + let err = GroupGeoMetadata::from_attributes(&map(json!({ + "spatial:dims": ["y", 1] + }))) + .unwrap_err() + .to_string(); + assert!(err.contains("strings"), "{err}"); + } +} diff --git a/rust/sedona-raster-zarr/src/lib.rs b/rust/sedona-raster-zarr/src/lib.rs new file mode 100644 index 000000000..1d35d29af --- /dev/null +++ b/rust/sedona-raster-zarr/src/lib.rs @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Zarr-backed N-D raster loader for SedonaDB. +//! +//! [`ZarrChunkReader`] is a `RecordBatchReader` that walks a Zarr +//! group's chunk grid lazily, emitting one raster row per chunk +//! position with one band per array. Each row carries a chunk-anchor +//! URI in `outdb_uri`; the `data` column stays empty. Metadata-only +//! operations (`count(*)`, `RS_Envelope`, `RS_Width`, …) work directly +//! against these rows; byte-consuming kernels need an async resolver +//! that's not part of this crate. +//! +//! Local filesystem stores only — `file://` URIs or bare paths. + +mod dtype; +mod geozarr; +mod loader; +mod source_uri; + +pub use loader::ZarrChunkReader; diff --git a/rust/sedona-raster-zarr/src/loader.rs b/rust/sedona-raster-zarr/src/loader.rs new file mode 100644 index 000000000..631565df2 --- /dev/null +++ b/rust/sedona-raster-zarr/src/loader.rs @@ -0,0 +1,810 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Zarr group → N-D raster streaming reader. +//! +//! [`ZarrChunkReader`] walks the group's chunk grid lazily and emits one +//! raster row per chunk position, with one band per array in the group. +//! Each row carries an `outdb_uri` chunk anchor +//! (`zarr:///#chunk=i0,i1,...`); the `data` +//! column stays empty until the async resolver lands and dereferences +//! the anchor to bytes on demand. + +use std::sync::Arc; + +use arrow_array::{RecordBatch, RecordBatchReader}; +use arrow_schema::{ArrowError, Schema, SchemaRef}; +use sedona_common::sedona_internal_datafusion_err; +use sedona_raster::builder::RasterBuilder; +use sedona_schema::datatypes::SedonaType; +use sedona_schema::raster::BandDataType; +use zarrs::array::Array; +#[cfg(test)] +use zarrs::array::ArrayBytes; +use zarrs::group::Group; +use zarrs_filesystem::FilesystemStore; + +use crate::dtype::zarr_to_band_data_type; +use crate::geozarr::GroupGeoMetadata; +use crate::source_uri::{build_chunk_anchor, group_uri_to_filesystem_path}; + +/// Streaming reader over the chunk grid of a Zarr group. +/// +/// Each `next()` call emits one `RecordBatch` containing up to +/// `batch_size` rows; one row per chunk position. The reader holds the +/// open group, parsed metadata, and the current chunk-grid position — +/// metadata parsing happens once in [`ZarrChunkReader::try_new`] and +/// the per-row work is just transform arithmetic + anchor URI +/// formatting. +/// +/// Rows always emit OutDb-style: `data` is empty, `outdb_uri` carries +/// a chunk anchor that the async OutDb resolver (registered separately) +/// resolves to bytes on demand. +pub struct ZarrChunkReader { + schema: SchemaRef, + group_uri: String, + array_infos: Vec, + geo: GroupGeoMetadata, + group_transform: [f64; 6], + spatial_dim_indices: Vec, + /// Owned copy of `array_infos[0].dim_names[i]` for `i` in + /// `spatial_dim_indices`. Kept here so `next()` doesn't need to + /// rebuild the `&str` slice each row. + spatial_dims_names: Vec, + chunk_spatial_shape: Vec, + /// Current position in the chunk grid (row-major, C-order). + chunk_indices: Vec, + /// Whether the grid has been exhausted. + exhausted: bool, + batch_size: usize, +} + +impl std::fmt::Debug for ZarrChunkReader { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("ZarrChunkReader") + .field("group_uri", &self.group_uri) + .field("num_arrays", &self.array_infos.len()) + .field("chunk_indices", &self.chunk_indices) + .field("exhausted", &self.exhausted) + .field("batch_size", &self.batch_size) + .finish() + } +} + +impl ZarrChunkReader { + /// Open a Zarr group and prepare a streaming reader over its chunk + /// grid. Performs all I/O for metadata up front (open group, parse + /// attributes, list arrays, validate constraints) so per-batch work + /// is cheap. + /// + /// `arrays`: + /// - `None` — read every multi-dimensional array. 1-D arrays + /// (typical xarray coord variables) are auto-skipped. + /// - `Some(names)` — read exactly the named arrays. Unknown names + /// error. 1-D arrays are always rejected (a raster band needs + /// ≥ 2 dimensions); naming one explicitly errors with a clear + /// message. + /// + /// `batch_size` controls how many chunk rows are emitted per + /// `RecordBatch`. Must be ≥ 1; callers typically pass + /// `SessionConfig::batch_size` (defaults to 8192). + pub fn try_new( + group_uri: &str, + arrays: Option<&[String]>, + batch_size: usize, + ) -> Result { + let batch_size = batch_size.max(1); + let OpenedGroup { + array_infos, + geo, + group_transform, + spatial_dim_indices, + } = open_and_validate(group_uri, arrays)?; + + let spatial_dims_names: Vec = spatial_dim_indices + .iter() + .map(|&i| array_infos[0].dim_names[i].clone()) + .collect(); + let chunk_spatial_shape: Vec = spatial_dim_indices + .iter() + .map(|&i| array_infos[0].chunk_shape[i] as i64) + .collect(); + + let raster_field = SedonaType::Raster + .to_storage_field("raster", true) + .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; + let schema = Arc::new(Schema::new(vec![raster_field])); + + let chunk_indices = vec![0u64; array_infos[0].chunk_grid_shape.len()]; + + Ok(Self { + schema, + group_uri: group_uri.to_string(), + array_infos, + geo, + group_transform, + spatial_dim_indices, + spatial_dims_names, + chunk_spatial_shape, + chunk_indices, + exhausted: false, + batch_size, + }) + } + + /// Append one raster row at the current `chunk_indices` to the + /// in-progress builder. Does not advance the cursor. + fn emit_one_row(&self, builder: &mut RasterBuilder) -> Result<(), ArrowError> { + let row_transform = compute_row_transform( + &self.group_transform, + &self.chunk_indices, + &self.array_infos[0].chunk_shape, + &self.spatial_dim_indices, + ); + let spatial_dims_ref: Vec<&str> = + self.spatial_dims_names.iter().map(String::as_str).collect(); + let crs_str = self.geo.crs.as_deref(); + builder.start_raster_nd( + &row_transform, + &spatial_dims_ref, + &self.chunk_spatial_shape, + crs_str, + )?; + + for info in &self.array_infos { + let dim_names_ref: Vec<&str> = info.dim_names.iter().map(String::as_str).collect(); + let nodata_ref = info.nodata.as_deref(); + // Every band gets its chunk-anchor URI populated as + // provenance metadata. `data.is_empty()` is the InDb/OutDb + // discriminator; this reader always emits empty `data` and + // defers pixel-byte resolution to the OutDb resolver. + let anchor = build_chunk_anchor(&self.group_uri, &info.path, &self.chunk_indices); + builder.start_band_nd( + Some(info.path.as_str()), + &dim_names_ref, + &info.chunk_shape, + info.data_type, + nodata_ref, + Some(anchor.as_str()), + Some("zarr"), + )?; + builder.band_data_writer().append_value([0u8; 0]); + builder.finish_band()?; + } + builder.finish_raster() + } +} + +impl Iterator for ZarrChunkReader { + type Item = Result; + + fn next(&mut self) -> Option { + if self.exhausted { + return None; + } + let mut builder = RasterBuilder::new(self.batch_size); + let mut rows_emitted = 0usize; + while rows_emitted < self.batch_size { + if let Err(e) = self.emit_one_row(&mut builder) { + self.exhausted = true; + return Some(Err(e)); + } + rows_emitted += 1; + if !advance_chunk_indices( + &mut self.chunk_indices, + &self.array_infos[0].chunk_grid_shape, + ) { + self.exhausted = true; + break; + } + } + if rows_emitted == 0 { + return None; + } + let struct_arr = match builder.finish() { + Ok(arr) => arr, + Err(e) => return Some(Err(e)), + }; + Some(RecordBatch::try_new( + self.schema.clone(), + vec![Arc::new(struct_arr)], + )) + } +} + +impl RecordBatchReader for ZarrChunkReader { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// Per-array metadata extracted once at group open and reused for every +/// chunk position. Caching this avoids re-reading Zarr metadata for each +/// of the (potentially thousands of) chunk rows. +struct ArrayInfo { + /// Array path within the store, used to build chunk anchor URIs and + /// surface in band names. + path: String, + /// SedonaDB BandDataType corresponding to this array's zarrs dtype. + data_type: BandDataType, + /// Dimension names in array order. Required to be `Some(_)` for every + /// dim; missing names error at validation time. + dim_names: Vec, + /// Inner chunk grid shape, one entry per dimension. Used to enumerate + /// chunk positions and validated to match across arrays. + chunk_grid_shape: Vec, + /// Chunk shape (elements per chunk per dim). Same for every chunk + /// position — ragged final chunks are not emitted as separate short + /// rows. + chunk_shape: Vec, + /// Encoded fill value in native-endian byte representation, for the + /// `nodata` field. None when the array has no fill value declared. + nodata: Option>, +} + +/// Bundle of group-level state returned by [`open_and_validate`]. +struct OpenedGroup { + array_infos: Vec, + geo: GroupGeoMetadata, + group_transform: [f64; 6], + spatial_dim_indices: Vec, +} + +/// Open the Zarr group, parse and validate group metadata, and return +/// everything `ZarrChunkReader` needs to iterate without further I/O +/// (apart from per-chunk byte fetches by future resolvers). +fn open_and_validate( + group_uri: &str, + arrays_filter: Option<&[String]>, +) -> Result { + let fs_path = group_uri_to_filesystem_path(group_uri)?; + let store = FilesystemStore::new(&fs_path).map_err(|e| { + ArrowError::ExternalError(Box::new(sedona_internal_datafusion_err!( + "failed to open Zarr filesystem store at {}: {e}", + fs_path.display() + ))) + })?; + let storage: Arc = Arc::new(store); + + let group = Group::open(storage.clone(), "/").map_err(|e| { + ArrowError::ExternalError(Box::new(sedona_internal_datafusion_err!( + "failed to open Zarr group at {group_uri}: {e}" + ))) + })?; + + let geo = GroupGeoMetadata::from_attributes(group.attributes())?; + + // CRS-without-transform is almost always a malformed-metadata bug — + // the user thinks they have full georef but downstream spatial joins + // will silently use the identity-pixel-coords default. Error loudly + // so they fix the metadata rather than getting empty result sets. + if geo.crs.is_some() && geo.transform.is_none() { + return Err(ArrowError::InvalidArgumentError(format!( + "Zarr group at {group_uri} declares a CRS but no `spatial:transform` \ + attribute; refusing to fall back to the identity transform because \ + that would silently produce wrong results in spatial joins. Declare \ + `spatial:transform` on the group or remove the CRS to read this as \ + a non-georeferenced datacube." + ))); + } + + let arrays = group.child_arrays().map_err(|e| { + ArrowError::ExternalError(Box::new(sedona_internal_datafusion_err!( + "failed to enumerate child arrays in {group_uri}: {e}" + ))) + })?; + if arrays.is_empty() { + return Err(ArrowError::InvalidArgumentError(format!( + "Zarr group at {group_uri} has no child arrays" + ))); + } + + let arrays = select_arrays(arrays, arrays_filter, group_uri)?; + let array_infos = collect_array_infos(arrays)?; + validate_group_constraints(&array_infos)?; + + // Spatial-dim resolution. Two configurations are accepted: + // - dim_names ends with ["y", "x"] (canonical for georeferenced + // 2-D and time-series rasters); the spatial extent is the chunk's + // last two dims. + // - `spatial:dims` attribute on the group explicitly names them. + // Anything else errors with a clear message — silently picking dims + // would produce wrong per-row transforms. + let spatial_dim_indices = + resolve_spatial_dim_indices(&array_infos[0].dim_names, geo.spatial_dims.as_deref())?; + + let group_transform = match geo.transform { + Some(t) => t, + None => { + // Both `spatial:transform` and `proj:*` are absent (the + // CRS-only case errored above). Fall back to identity pixel + // coordinates and breadcrumb a warning so spatial-join + // surprises are debuggable. + log::warn!( + "Zarr group at {group_uri} has no `spatial:transform`; falling back \ + to the identity pixel-coordinate transform [0, 1, 0, 0, 0, -1]. \ + Spatial operations against this raster will use pixel coordinates." + ); + [0.0, 1.0, 0.0, 0.0, 0.0, -1.0] + } + }; + + // chunk_grid_shape comes from untrusted Zarr metadata; bound-check the + // product so a hostile or malformed grid can't make per-batch + // RasterBuilder capacity (or downstream consumers) overflow. + array_infos[0] + .chunk_grid_shape + .iter() + .try_fold(1usize, |acc, &n| { + usize::try_from(n).ok().and_then(|n| acc.checked_mul(n)) + }) + .ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "chunk grid shape {:?} overflows usize", + array_infos[0].chunk_grid_shape + )) + })?; + + Ok(OpenedGroup { + array_infos, + geo, + group_transform, + spatial_dim_indices, + }) +} + +/// Collect per-array metadata from open zarrs `Array` handles. +/// +/// Sorts arrays by path so band ordering across rows is deterministic +/// (zarrs's underlying store listing order is implementation-defined — +/// filesystem stores currently happen to enumerate alphabetically, but +/// that's not part of the contract we want consumers to rely on). +fn collect_array_infos( + mut arrays: Vec>, +) -> Result, ArrowError> { + arrays.sort_by(|a, b| a.path().as_str().cmp(b.path().as_str())); + let mut out = Vec::with_capacity(arrays.len()); + for array in arrays { + let path = array.path().to_string(); + let data_type = zarr_to_band_data_type(array.data_type())?; + let dim_names = resolve_dim_names(&array, &path)?; + let chunk_grid_shape = array.chunk_grid_shape().to_vec(); + let chunk_shape = array + .chunk_shape(&vec![0u64; chunk_grid_shape.len()]) + .map_err(|e| { + ArrowError::ExternalError(Box::new(sedona_internal_datafusion_err!( + "array {path}: failed to query chunk shape: {e}" + ))) + })? + .iter() + .map(|n| n.get()) + .collect(); + let fill_bytes = array.fill_value().as_ne_bytes(); + let nodata = if fill_bytes.is_empty() { + None + } else { + Some(fill_bytes.to_vec()) + }; + out.push(ArrayInfo { + path, + data_type, + dim_names, + chunk_grid_shape, + chunk_shape, + nodata, + }); + } + Ok(out) +} + +/// Apply the array-selection rules. 1-D arrays (typical xarray-style +/// coord variables) are always dropped — a raster band requires at +/// least 2 dimensions, so reading a 1-D array could never succeed. +/// - Explicit filter: keep arrays whose path (with leading `/` stripped) +/// matches one of the requested names. Unknown names error so users +/// don't silently get an empty group from a typo; naming a 1-D array +/// errors with a clear message rather than producing a confusing +/// "no spatial axes" failure downstream. +/// - No filter: read every multi-dimensional array. +fn select_arrays( + arrays: Vec>, + filter: Option<&[String]>, + group_uri: &str, +) -> Result>, ArrowError> { + if let Some(names) = filter { + let available: Vec = arrays + .iter() + .map(|a| a.path().as_str().trim_start_matches('/').to_string()) + .collect(); + for requested in names { + let needle = requested.trim_start_matches('/'); + match arrays + .iter() + .find(|a| a.path().as_str().trim_start_matches('/') == needle) + { + None => { + return Err(ArrowError::InvalidArgumentError(format!( + "Zarr group at {group_uri} has no array named {requested:?}; \ + available arrays: {available:?}" + ))); + } + Some(a) if a.shape().len() < 2 => { + return Err(ArrowError::InvalidArgumentError(format!( + "array {requested:?} has rank {} (shape {:?}); a raster band \ + requires at least 2 dimensions and cannot be read.", + a.shape().len(), + a.shape() + ))); + } + Some(_) => {} + } + } + let kept: Vec<_> = arrays + .into_iter() + .filter(|a| { + let path = a.path().as_str().trim_start_matches('/').to_string(); + names.iter().any(|n| n.trim_start_matches('/') == path) + }) + .collect(); + return Ok(kept); + } + + let kept: Vec<_> = arrays.into_iter().filter(|a| a.shape().len() > 1).collect(); + if kept.is_empty() { + return Err(ArrowError::InvalidArgumentError(format!( + "Zarr group at {group_uri} contains only 1-D arrays (typical \ + xarray-style coord variables); a raster band requires at least \ + 2 dimensions, so this group has nothing readable as a raster" + ))); + } + Ok(kept) +} + +/// Resolve dimension names for an array, supporting both Zarr v3 +/// (first-class `dimension_names` field) and Zarr v2 with the xarray +/// `_ARRAY_DIMENSIONS` attribute. Errors if neither carries a complete +/// set of named dimensions matching the array's rank. +fn resolve_dim_names(array: &Array, path: &str) -> Result, ArrowError> { + let rank = array.shape().len(); + + if let Some(names) = array.dimension_names() { + return names + .iter() + .enumerate() + .map(|(i, n)| { + n.clone().ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "array {path}: dimension {i} has no name; every Zarr array \ + dimension must be named", + )) + }) + }) + .collect(); + } + + // Zarr v2 fallback: xarray's `_ARRAY_DIMENSIONS` convention. v2 had no + // first-class dimension_names field and zarrs's v2->v3 converter + // preserves attributes verbatim without lifting this one. + if let Some(value) = array.attributes().get("_ARRAY_DIMENSIONS") { + let arr = value.as_array().ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "array {path}: _ARRAY_DIMENSIONS must be a JSON array of strings; got {value}" + )) + })?; + if arr.len() != rank { + return Err(ArrowError::InvalidArgumentError(format!( + "array {path}: _ARRAY_DIMENSIONS has {} entries but array has rank {rank}", + arr.len() + ))); + } + return arr + .iter() + .enumerate() + .map(|(i, v)| { + v.as_str().map(str::to_string).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "array {path}: _ARRAY_DIMENSIONS[{i}] must be a string; got {v}" + )) + }) + }) + .collect(); + } + + Err(ArrowError::InvalidArgumentError(format!( + "array {path}: no dimension names found. Zarr v3 arrays must set \ + `dimension_names`; Zarr v2 arrays must set the `_ARRAY_DIMENSIONS` \ + attribute (xarray convention)." + ))) +} + +/// Enforce group constraints. All arrays must agree on chunk grid shape, +/// chunk shape, and dimension names. We do NOT enforce shared element +/// shape (`array.shape()`) because users routinely group arrays with +/// the same chunk grid but different totals (e.g. a coord variable with +/// one fewer dim is rejected here anyway by the dim-name check). +fn validate_group_constraints(infos: &[ArrayInfo]) -> Result<(), ArrowError> { + let first = &infos[0]; + for other in &infos[1..] { + if other.chunk_grid_shape != first.chunk_grid_shape { + return Err(ArrowError::InvalidArgumentError(format!( + "arrays {} and {} have different chunk grid shapes ({:?} vs {:?}); \ + every array in the group must share the same chunk grid. \ + Pass `arrays = [...]` to read only compatible arrays.", + first.path, other.path, first.chunk_grid_shape, other.chunk_grid_shape + ))); + } + if other.chunk_shape != first.chunk_shape { + return Err(ArrowError::InvalidArgumentError(format!( + "arrays {} and {} have different chunk shapes ({:?} vs {:?}); \ + every array in the group must share the same chunk shape. \ + Pass `arrays = [...]` to read only compatible arrays.", + first.path, other.path, first.chunk_shape, other.chunk_shape + ))); + } + if other.dim_names != first.dim_names { + return Err(ArrowError::InvalidArgumentError(format!( + "arrays {} and {} have different dimension names ({:?} vs {:?}); \ + every array in the group must declare identical dim_names. \ + Pass `arrays = [...]` to read only compatible arrays.", + first.path, other.path, first.dim_names, other.dim_names + ))); + } + } + Ok(()) +} + +/// Pick the `(y_index, x_index)` axes of an array's dim_names. +/// +/// If `spatial_dims` is provided via the group's `spatial:dims` attribute, +/// look up those names by position. Otherwise, default to the last two +/// dims and require they be named `y` and `x` (in that order) — the +/// canonical GeoZarr-2D convention. Anything else errors. +fn resolve_spatial_dim_indices( + dim_names: &[String], + spatial_dims: Option<&[String]>, +) -> Result, ArrowError> { + if let Some(spatial) = spatial_dims { + let mut idx = Vec::with_capacity(spatial.len()); + for s in spatial { + let i = dim_names.iter().position(|n| n == s).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "spatial:dims declared name {s:?} not found in array dim_names {dim_names:?}", + )) + })?; + idx.push(i); + } + return Ok(idx); + } + let n = dim_names.len(); + if n < 2 { + return Err(ArrowError::InvalidArgumentError(format!( + "at least 2 dimensions are required to resolve spatial axes; got {dim_names:?}", + ))); + } + if dim_names[n - 2] != "y" || dim_names[n - 1] != "x" { + return Err(ArrowError::InvalidArgumentError(format!( + "the last two dim_names must be [\"y\", \"x\"] when `spatial:dims` is \ + not declared; got {dim_names:?}", + ))); + } + Ok(vec![n - 2, n - 1]) +} + +/// Per-chunk transform: translate the group's transform so the chunk's +/// `[0, 0]` element maps to the chunk's spatial origin. +fn compute_row_transform( + group_transform: &[f64; 6], + chunk_indices: &[u64], + chunk_shape: &[u64], + spatial_dim_indices: &[usize], +) -> [f64; 6] { + // GDAL GeoTransform layout: [origin_x, scale_x, skew_x, origin_y, skew_y, scale_y]. + // Translation along x = chunk_x_index × chunk_x_size in pixel-coordinate space, + // converted to world coordinates via the affine. + // + // spatial_dim_indices is validated upstream to be [y_index, x_index]. + // Index 0 is the y axis, index 1 is the x axis. + let y_axis = spatial_dim_indices[0]; + let x_axis = spatial_dim_indices[1]; + let x_offset = (chunk_indices[x_axis] * chunk_shape[x_axis]) as f64; + let y_offset = (chunk_indices[y_axis] * chunk_shape[y_axis]) as f64; + let [ox, sx, kx, oy, ky, sy] = *group_transform; + [ + ox + sx * x_offset + kx * y_offset, + sx, + kx, + oy + ky * x_offset + sy * y_offset, + ky, + sy, + ] +} + +/// Advance `chunk_indices` row-major over `chunk_grid_shape`. Returns +/// `true` while there are positions left, `false` when the grid is +/// exhausted (and the indices wrap back to all-zero). +fn advance_chunk_indices(chunk_indices: &mut [u64], chunk_grid_shape: &[u64]) -> bool { + for i in (0..chunk_indices.len()).rev() { + chunk_indices[i] += 1; + if chunk_indices[i] < chunk_grid_shape[i] { + return true; + } + chunk_indices[i] = 0; + } + false +} + +/// Retrieve a single chunk's bytes as a fresh `Vec`. +/// +/// Uses `ArrayBytes::Fixed`, so this errors for variable-length element +/// types — those don't have a `BandDataType` counterpart anyway, so the +/// dtype check in `collect_array_infos` rejects them upstream. +/// +/// This is the only pixel-byte read primitive in the crate. The loader +/// itself never calls it today — it always emits OutDb anchors — but +/// the async `RS_EnsureLoaded` resolver (follow-up PR) will. Lives +/// behind `#[cfg(test)]` until the resolver lands; the unit test below +/// exercises it so the implementation doesn't bit-rot in the +/// meantime. +#[cfg(test)] +fn retrieve_chunk_bytes( + array: &Array, + chunk_indices: &[u64], +) -> Result, ArrowError> { + let bytes = array + .retrieve_chunk::>(chunk_indices) + .map_err(|e| { + ArrowError::ExternalError(Box::new(sedona_internal_datafusion_err!( + "failed to retrieve chunk {:?} from {}: {e}", + chunk_indices, + array.path() + ))) + })?; + let raw = bytes.into_fixed().map_err(|_| { + ArrowError::InvalidArgumentError(format!( + "array {}: variable-length chunk bytes are not supported", + array.path() + )) + })?; + Ok(raw.into_owned()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use tempfile::TempDir; + use zarrs::array::data_type; + use zarrs::array::ArrayBuilder; + + /// Direct coverage for `retrieve_chunk_bytes`. The function is the + /// only pixel-byte read primitive in the crate today; previously it + /// was exercised through `group_to_indb_rasters` integration tests, + /// which are gone now that the loader only emits OutDb anchors. The + /// follow-up `RS_EnsureLoaded` resolver will call this directly. + #[test] + fn retrieve_chunk_bytes_returns_decoded_chunk() { + let tmp = TempDir::new().unwrap(); + let store = Arc::new(FilesystemStore::new(tmp.path()).unwrap()); + + // 1×4 UInt8 array with chunks of size [1, 2] → chunk grid [1, 2]. + let arr = ArrayBuilder::new(vec![1u64, 4u64], vec![1u64, 2u64], data_type::uint8(), 0u8) + .dimension_names(Some(["y", "x"])) + .build(store.clone(), "/band") + .unwrap(); + arr.store_metadata().unwrap(); + arr.store_chunk(&[0u64, 0u64], vec![10u8, 11u8]).unwrap(); + arr.store_chunk(&[0u64, 1u64], vec![20u8, 21u8]).unwrap(); + + let chunk_0 = retrieve_chunk_bytes(&arr, &[0, 0]).unwrap(); + assert_eq!(chunk_0, vec![10u8, 11u8]); + + let chunk_1 = retrieve_chunk_bytes(&arr, &[0, 1]).unwrap(); + assert_eq!(chunk_1, vec![20u8, 21u8]); + } + + #[test] + fn advance_chunk_indices_walks_row_major() { + // 2×3 grid; outer axis varies slowest. + let shape = vec![2u64, 3u64]; + let mut idx = vec![0u64, 0u64]; + let mut visited = vec![idx.clone()]; + while advance_chunk_indices(&mut idx, &shape) { + visited.push(idx.clone()); + } + // Expected row-major traversal of a 2×3 grid (last axis fastest): + // (0,0) (0,1) (0,2) (1,0) (1,1) (1,2) + let expected = vec![ + vec![0, 0], + vec![0, 1], + vec![0, 2], + vec![1, 0], + vec![1, 1], + vec![1, 2], + ]; + assert_eq!(visited, expected); + } + + #[test] + fn advance_chunk_indices_signals_exhaustion_via_wraparound() { + // After the last position (1,2) the next advance must return false + // and reset back to all-zero. + let shape = vec![2u64, 3u64]; + let mut idx = vec![1u64, 2u64]; + assert!(!advance_chunk_indices(&mut idx, &shape)); + assert_eq!(idx, vec![0, 0]); + } + + #[test] + fn advance_chunk_indices_single_position_grid_exits_immediately() { + let shape = vec![1u64]; + let mut idx = vec![0u64]; + assert!(!advance_chunk_indices(&mut idx, &shape)); + } + + #[test] + fn compute_row_transform_translates_to_chunk_origin_no_skew() { + // 2-D y,x array with chunk [2, 2] and group origin (10, 20). + // Chunk (1, 2) in row-major should map to origin (10 + 2*2, 20 + 2*1*(-1)) + // for transform [10, 1, 0, 20, 0, -1]: x_off = 4, y_off = 2. + let group_t = [10.0, 1.0, 0.0, 20.0, 0.0, -1.0]; + let chunk_shape = vec![2u64, 2u64]; + let chunk_idx = vec![1u64, 2u64]; + let t = compute_row_transform(&group_t, &chunk_idx, &chunk_shape, &[0, 1]); + // y_axis=0, x_axis=1 → x_off=2*2=4, y_off=1*2=2 + assert_eq!(t[0], 10.0 + 4.0); // origin_x + assert_eq!(t[3], 20.0 - 2.0); // origin_y after y_off=2 with sy=-1 + // Scale/skew carry through unchanged. + assert_eq!(t[1], 1.0); + assert_eq!(t[2], 0.0); + assert_eq!(t[4], 0.0); + assert_eq!(t[5], -1.0); + } + + #[test] + fn resolve_spatial_dim_indices_default_yx() { + let names = vec!["time".into(), "y".into(), "x".into()]; + let idx = resolve_spatial_dim_indices(&names, None).unwrap(); + assert_eq!(idx, vec![1, 2]); + } + + #[test] + fn resolve_spatial_dim_indices_default_rejects_wrong_order() { + let names = vec!["x".into(), "y".into()]; + let err = resolve_spatial_dim_indices(&names, None) + .unwrap_err() + .to_string(); + assert!(err.contains("[\"y\", \"x\"]"), "{err}"); + } + + #[test] + fn resolve_spatial_dim_indices_explicit_lookup() { + let names = vec!["lat".into(), "lon".into(), "time".into()]; + let spatial = vec!["lat".to_string(), "lon".to_string()]; + let idx = resolve_spatial_dim_indices(&names, Some(&spatial)).unwrap(); + assert_eq!(idx, vec![0, 1]); + } + + #[test] + fn resolve_spatial_dim_indices_explicit_missing_errors() { + let names = vec!["a".into(), "b".into()]; + let spatial = vec!["nope".to_string()]; + let err = resolve_spatial_dim_indices(&names, Some(&spatial)) + .unwrap_err() + .to_string(); + assert!(err.contains("nope"), "{err}"); + } +} diff --git a/rust/sedona-raster-zarr/src/source_uri.rs b/rust/sedona-raster-zarr/src/source_uri.rs new file mode 100644 index 000000000..6bd984854 --- /dev/null +++ b/rust/sedona-raster-zarr/src/source_uri.rs @@ -0,0 +1,294 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Zarr URI helpers — group locators and per-chunk OutDb anchors. +//! +//! Two URI shapes flow through the loader: +//! +//! 1. **Group URI** (the loader entry-point argument). Identifies a Zarr +//! group on a backend, e.g. `file:///tmp/datacube.zarr` or a bare +//! local path. Cloud schemes (`s3://`, `gs://`, `az://`, `https://`) +//! will be accepted by a future revision; today only `file://` and +//! bare-path are recognised. +//! 2. **Chunk anchor URI** (written into a band's `outdb_uri`). Addresses +//! one chunk in one array within a group: +//! `#array=&chunk=,,...`. The store URI +//! is the original group URI verbatim (whatever scheme that uses); +//! array path and chunk indices both live in the fragment so the +//! store URI is unambiguous even when both contain `/` (e.g. +//! `s3://bucket/foo.zarr/2024` + `subgroup/B01`). Inner-chunk indices +//! always — sharding is a storage detail the resolver handles. +//! +//! The "this is a zarr anchor" signal lives in the band's +//! `outdb_format = "zarr"` field, not in a URI scheme prefix — matches +//! the GDAL OutDb convention and lets the format-keyed dispatcher +//! route without parsing URI schemes. + +use arrow_schema::ArrowError; + +/// Parts of a chunk-anchor URI. +/// +/// `#[cfg(test)]`: no production consumer yet. The async byte +/// resolver (separate follow-up) will parse `outdb_uri` values back +/// into this struct. +#[cfg(test)] +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChunkAnchor { + /// Original store URI for the *group* (e.g. `file:///tmp/foo.zarr`, + /// `s3://bucket/foo.zarr/2024`). Same value across every chunk of every + /// array in a group. + pub store_uri: String, + /// Array's path within the store (e.g. `temperature`, `subgroup/B01`). + pub array_path: String, + /// Chunk's position in the array's inner chunk grid (one index per + /// array dimension). + pub chunk_indices: Vec, +} + +/// Build a chunk anchor URI from its parts. +/// +/// Format: `#array=&chunk=,,...`. The +/// store URI is the original group URI verbatim; array path and chunk +/// indices live in the fragment so the store URI is unambiguous even +/// when path components themselves contain `/`. "This is zarr" lives +/// in the band's `outdb_format` field rather than in a URI scheme. +pub fn build_chunk_anchor(store_uri: &str, array_path: &str, chunk_indices: &[u64]) -> String { + let indices = chunk_indices + .iter() + .map(|i| i.to_string()) + .collect::>() + .join(","); + let array = array_path.trim_start_matches('/'); + format!("{store_uri}#array={array}&chunk={indices}") +} + +/// Parse a chunk-anchor URI back into its parts. +/// +/// Strict: rejects URIs that don't carry both `array=` and `chunk=` +/// fragment parameters with valid values. +/// +/// `#[cfg(test)]`: pairs with [`ChunkAnchor`]; resurrected when the +/// async byte resolver lands. +#[cfg(test)] +pub fn parse_chunk_anchor(uri: &str) -> Result { + let (store_uri, fragment) = uri.split_once('#').ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "chunk anchor URI missing `#array=...&chunk=...` fragment: {uri}" + )) + })?; + + let mut array_path: Option<&str> = None; + let mut chunk_str: Option<&str> = None; + for pair in fragment.split('&') { + if let Some(v) = pair.strip_prefix("array=") { + array_path = Some(v); + } else if let Some(v) = pair.strip_prefix("chunk=") { + chunk_str = Some(v); + } + // Unknown fragment params are ignored — forward-compatible for + // future per-anchor extensions (e.g. version pins). + } + + let array_path = array_path + .ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "chunk anchor URI fragment missing `array=` parameter: {uri}" + )) + })? + .to_string(); + let indices_str = chunk_str.ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "chunk anchor URI fragment missing `chunk=` parameter: {uri}" + )) + })?; + if indices_str.is_empty() { + return Err(ArrowError::InvalidArgumentError(format!( + "chunk anchor URI has empty index list: {uri}" + ))); + } + let chunk_indices = indices_str + .split(',') + .map(|s| { + s.parse::().map_err(|_| { + ArrowError::InvalidArgumentError(format!( + "chunk index `{s}` is not a non-negative integer in {uri}" + )) + }) + }) + .collect::, _>>()?; + + Ok(ChunkAnchor { + store_uri: store_uri.to_string(), + array_path, + chunk_indices, + }) +} + +/// Normalize a user-supplied group URI into a local filesystem path. +/// +/// Only `file://` and bare-path URIs are supported. Cloud schemes +/// (`s3://`, `gs://`, `az://`, `https://`) error with a clear message. +pub fn group_uri_to_filesystem_path(uri: &str) -> Result { + if let Some(rest) = uri.strip_prefix("file://") { + return Ok(std::path::PathBuf::from(rest)); + } + for scheme in ["s3://", "gs://", "az://", "https://", "http://"] { + if uri.starts_with(scheme) { + return Err(ArrowError::NotYetImplemented(format!( + "cloud Zarr stores ({scheme}…) are not supported yet; \ + use a local filesystem path or `file://` URI" + ))); + } + } + // Bare path. + Ok(std::path::PathBuf::from(uri)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn build_anchor_basic() { + let uri = build_chunk_anchor("file:///tmp/datacube.zarr", "temperature", &[47, 2, 3]); + assert_eq!( + uri, + "file:///tmp/datacube.zarr#array=temperature&chunk=47,2,3" + ); + } + + #[test] + fn build_anchor_strips_leading_slash_on_array_path() { + let uri = build_chunk_anchor("file:///tmp/foo.zarr", "/array", &[0]); + assert_eq!(uri, "file:///tmp/foo.zarr#array=array&chunk=0"); + } + + #[test] + fn build_anchor_with_nested_array_path() { + let uri = build_chunk_anchor("s3://bucket/foo.zarr/2024", "subgroup/B01", &[1, 5]); + assert_eq!( + uri, + "s3://bucket/foo.zarr/2024#array=subgroup/B01&chunk=1,5" + ); + } + + #[test] + fn parse_anchor_basic() { + let anchor = + parse_chunk_anchor("file:///tmp/datacube.zarr#array=temperature&chunk=47,2,3").unwrap(); + assert_eq!(anchor.store_uri, "file:///tmp/datacube.zarr"); + assert_eq!(anchor.array_path, "temperature"); + assert_eq!(anchor.chunk_indices, vec![47, 2, 3]); + } + + #[test] + fn parse_anchor_with_s3_store_and_nested_array() { + let anchor = + parse_chunk_anchor("s3://bucket/foo.zarr/2024#array=subgroup/B01&chunk=0,0,0").unwrap(); + assert_eq!(anchor.store_uri, "s3://bucket/foo.zarr/2024"); + assert_eq!(anchor.array_path, "subgroup/B01"); + assert_eq!(anchor.chunk_indices, vec![0, 0, 0]); + } + + #[test] + fn parse_anchor_rejects_missing_fragment() { + // No `#` means no fragment, so neither `array=` nor `chunk=` is + // present. The parser surfaces "missing fragment". + let err = parse_chunk_anchor("file:///foo.zarr") + .unwrap_err() + .to_string(); + assert!(err.contains("fragment"), "{err}"); + } + + #[test] + fn parse_anchor_rejects_missing_array_param() { + let err = parse_chunk_anchor("file:///foo.zarr#chunk=0") + .unwrap_err() + .to_string(); + assert!(err.contains("array="), "{err}"); + } + + #[test] + fn parse_anchor_rejects_missing_chunk_param() { + let err = parse_chunk_anchor("file:///foo.zarr#array=a") + .unwrap_err() + .to_string(); + assert!(err.contains("chunk="), "{err}"); + } + + #[test] + fn parse_anchor_rejects_empty_indices() { + let err = parse_chunk_anchor("file:///foo.zarr#array=a&chunk=") + .unwrap_err() + .to_string(); + assert!(err.contains("empty"), "{err}"); + } + + #[test] + fn parse_anchor_rejects_non_integer_index() { + let err = parse_chunk_anchor("file:///foo.zarr#array=a&chunk=0,abc,2") + .unwrap_err() + .to_string(); + assert!(err.contains("abc"), "{err}"); + } + + #[test] + fn parse_anchor_ignores_unknown_fragment_params() { + // Forward-compatible: extra `&key=value` pairs in the fragment + // shouldn't break parsing — they're reserved for future use. + let anchor = parse_chunk_anchor("file:///foo.zarr#array=t&chunk=0,0&version=v3").unwrap(); + assert_eq!(anchor.array_path, "t"); + assert_eq!(anchor.chunk_indices, vec![0, 0]); + } + + #[test] + fn anchor_roundtrip() { + let original = ChunkAnchor { + store_uri: "file:///tmp/foo.zarr".into(), + array_path: "subgroup/B01".into(), + chunk_indices: vec![10, 20, 30], + }; + let uri = build_chunk_anchor( + &original.store_uri, + &original.array_path, + &original.chunk_indices, + ); + let parsed = parse_chunk_anchor(&uri).unwrap(); + assert_eq!(parsed, original); + } + + #[test] + fn group_uri_file_scheme() { + let path = group_uri_to_filesystem_path("file:///tmp/foo.zarr").unwrap(); + assert_eq!(path, std::path::PathBuf::from("/tmp/foo.zarr")); + } + + #[test] + fn group_uri_bare_path() { + let path = group_uri_to_filesystem_path("/tmp/foo.zarr").unwrap(); + assert_eq!(path, std::path::PathBuf::from("/tmp/foo.zarr")); + } + + #[test] + fn group_uri_cloud_scheme_errors() { + let err = group_uri_to_filesystem_path("s3://bucket/foo.zarr") + .unwrap_err() + .to_string(); + assert!(err.contains("s3://"), "{err}"); + assert!(err.contains("not supported"), "{err}"); + } +} diff --git a/rust/sedona-raster-zarr/tests/zarr_roundtrip.rs b/rust/sedona-raster-zarr/tests/zarr_roundtrip.rs new file mode 100644 index 000000000..7da73b753 --- /dev/null +++ b/rust/sedona-raster-zarr/tests/zarr_roundtrip.rs @@ -0,0 +1,376 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end fixture test: build a small Zarr group on disk with the +//! `zarrs` crate, then read it back through `read_all` and +//! verify the resulting raster `StructArray`. The loader always emits +//! OutDb-style rows (empty `data`, populated `outdb_uri`), so these +//! tests assert on metadata and chunk-anchor URIs rather than pixel +//! bytes. Pixel-byte coverage lives in the loader's unit tests against +//! `retrieve_chunk_bytes`. + +use std::sync::Arc; + +use arrow_array::cast::AsArray; +use arrow_array::StructArray; +use arrow_schema::ArrowError; +use sedona_raster::array::RasterStructArray; +use sedona_raster::traits::RasterRef; +use sedona_raster_zarr::ZarrChunkReader; + +/// Drain a `ZarrChunkReader` into a single `StructArray`. Fixtures in +/// this file are small (≤8 chunk rows) so they fit in one batch with a +/// generous batch_size. Anything bigger would need `arrow::compute::concat`. +fn read_all(uri: &str, arrays: Option<&[String]>) -> Result { + let reader = ZarrChunkReader::try_new(uri, arrays, 1024)?; + let batches: Vec<_> = reader.collect::, _>>()?; + assert!( + batches.len() <= 1, + "test helper assumes ≤1 batch; got {} — bump batch_size or add concat", + batches.len() + ); + let batch = batches.into_iter().next().ok_or_else(|| { + ArrowError::InvalidArgumentError("ZarrChunkReader produced zero batches".into()) + })?; + Ok(batch.column(0).as_struct().clone()) +} +use sedona_schema::raster::BandDataType; +use tempfile::TempDir; +use zarrs::array::data_type; +use zarrs::array::ArrayBuilder; +use zarrs::group::GroupBuilder; +use zarrs_filesystem::FilesystemStore; + +/// Build a 2-band group on disk: +/// - dims: [t, y, x] +/// - shape: [2, 4, 4] +/// - chunks: [1, 2, 2] → chunk grid [2, 2, 2] = 8 chunk positions +/// - arrays: "temperature" (UInt8) and "pressure" (UInt8) +/// +/// Returns the temp dir (kept alive by the caller so files persist). +fn build_fixture() -> TempDir { + let tmp = TempDir::new().unwrap(); + let store = Arc::new(FilesystemStore::new(tmp.path()).unwrap()); + + // Group with a known affine transform so we can verify per-chunk + // transforms below. + let mut group_attrs = serde_json::Map::new(); + group_attrs.insert( + "spatial:transform".into(), + serde_json::json!([100.0, 1.0, 0.0, 200.0, 0.0, -1.0]), + ); + group_attrs.insert("proj:epsg".into(), serde_json::json!(4326)); + GroupBuilder::new() + .attributes(group_attrs) + .build(store.clone(), "/") + .unwrap() + .store_metadata() + .unwrap(); + + for name in ["temperature", "pressure"] { + let array = ArrayBuilder::new( + vec![2u64, 4u64, 4u64], + vec![1u64, 2u64, 2u64], + data_type::uint8(), + 0u8, + ) + .dimension_names(Some(["t", "y", "x"])) + .build(store.clone(), &format!("/{name}")) + .unwrap(); + array.store_metadata().unwrap(); + + // Bytes don't matter for these tests — the loader doesn't read + // them. Write zeros so the chunk files exist for any future + // resolver fixture. + for t in 0..2u64 { + for yc in 0..2u64 { + for xc in 0..2u64 { + array.store_chunk(&[t, yc, xc], vec![0u8; 4]).unwrap(); + } + } + } + } + + tmp +} + +#[test] +fn round_trip_emits_one_row_per_chunk_position_with_outdb_anchors() { + let tmp = build_fixture(); + let uri = format!("file://{}", tmp.path().display()); + let arr = read_all(&uri, None).unwrap(); + + let rasters = RasterStructArray::new(&arr); + assert_eq!(rasters.len(), 8, "expected 8 chunk rows (2*2*2)"); + + // First row corresponds to chunk (t=0, y=0, x=0). With group transform + // [100, 1, 0, 200, 0, -1] and chunk shape [1, 2, 2], chunk (0,0,0) has + // origin (100, 200) and spatial_shape [2, 2]. + let r0 = rasters.get(0).unwrap(); + let r0_transform: Vec = r0.transform().to_vec(); + assert_eq!(r0_transform, vec![100.0, 1.0, 0.0, 200.0, 0.0, -1.0]); + assert_eq!(r0.spatial_shape(), &[2, 2]); + assert_eq!(r0.num_bands(), 2); + assert_eq!(r0.crs(), Some("EPSG:4326")); + + // Bands are sorted by array path for determinism — `pressure` sorts + // before `temperature` lexicographically, so band 0 is pressure and + // band 1 is temperature. + let pressure = r0.band(0).unwrap(); + assert_eq!(pressure.raw_source_shape(), &[1, 2, 2]); + assert_eq!(pressure.data_type(), BandDataType::UInt8); + assert!( + !pressure.is_indb(), + "loader emits OutDb rows — is_indb() must be false" + ); + assert_eq!(pressure.outdb_format(), Some("zarr")); + let anchor = pressure.outdb_uri().expect("outdb_uri set"); + // Anchor is the group URI verbatim plus a fragment carrying array + // path + chunk indices. No `zarr://` prefix. + assert!(anchor.starts_with("file://"), "got: {anchor}"); + assert!(!anchor.starts_with("zarr://"), "got: {anchor}"); + assert!(anchor.contains("#array=pressure"), "got: {anchor}"); + assert!(anchor.contains("&chunk=0,0,0"), "got: {anchor}"); + + // Last row corresponds to chunk (t=1, y=1, x=1). Anchor + transform + // both reflect the translation. + let last = rasters.get(7).unwrap(); + let last_transform: Vec = last.transform().to_vec(); + assert_eq!(last_transform[0], 100.0 + 2.0); // x_off = 2 + assert_eq!(last_transform[3], 200.0 - 2.0); // y_off = 2, sy = -1 + let temp = last.band(1).unwrap(); + let anchor = temp.outdb_uri().expect("outdb_uri set"); + assert!(anchor.contains("#array=temperature"), "got: {anchor}"); + assert!(anchor.contains("&chunk=1,1,1"), "got: {anchor}"); +} + +#[test] +fn errors_on_empty_group() { + let tmp = TempDir::new().unwrap(); + let store = Arc::new(FilesystemStore::new(tmp.path()).unwrap()); + GroupBuilder::new() + .build(store.clone(), "/") + .unwrap() + .store_metadata() + .unwrap(); + let uri = format!("file://{}", tmp.path().display()); + let err = read_all(&uri, None).unwrap_err().to_string(); + assert!(err.contains("no child arrays"), "got: {err}"); +} + +/// Build a group with two 3-D data arrays and 1-D `t`/`y`/`x` coord +/// variables alongside them — the xarray-on-Zarr pattern. The loader's +/// default behaviour must drop the coord variables and read only the +/// data arrays. +fn build_xarray_style_fixture() -> TempDir { + let tmp = TempDir::new().unwrap(); + let store = Arc::new(FilesystemStore::new(tmp.path()).unwrap()); + GroupBuilder::new() + .build(store.clone(), "/") + .unwrap() + .store_metadata() + .unwrap(); + + // Two 3-D data arrays sharing the same chunk grid. + for name in ["temperature", "pressure"] { + let arr = ArrayBuilder::new( + vec![2u64, 4u64, 4u64], + vec![1u64, 2u64, 2u64], + data_type::uint8(), + 0u8, + ) + .dimension_names(Some(["t", "y", "x"])) + .build(store.clone(), &format!("/{name}")) + .unwrap(); + arr.store_metadata().unwrap(); + } + + // 1-D coord variables. Different chunk grid than the data arrays — + // would trip validate_group_constraints if not auto-skipped. + for (name, len, dim) in [("t", 2u64, "t"), ("y", 4u64, "y"), ("x", 4u64, "x")] { + let arr = ArrayBuilder::new(vec![len], vec![len], data_type::uint8(), 0u8) + .dimension_names(Some([dim])) + .build(store.clone(), &format!("/{name}")) + .unwrap(); + arr.store_metadata().unwrap(); + } + + tmp +} + +#[test] +fn auto_skips_1d_coord_variables() { + let tmp = build_xarray_style_fixture(); + let uri = format!("file://{}", tmp.path().display()); + let arr = read_all(&uri, None).unwrap(); + let rasters = RasterStructArray::new(&arr); + // 2*2*2 = 8 chunk positions, with 2 bands per row (pressure, temperature). + assert_eq!(rasters.len(), 8); + let r0 = rasters.get(0).unwrap(); + assert_eq!(r0.num_bands(), 2); +} + +#[test] +fn explicit_arrays_filter_selects_subset() { + let tmp = build_xarray_style_fixture(); + let uri = format!("file://{}", tmp.path().display()); + let filter = vec!["temperature".to_string()]; + let arr = read_all(&uri, Some(&filter)).unwrap(); + let rasters = RasterStructArray::new(&arr); + assert_eq!(rasters.len(), 8); + let r0 = rasters.get(0).unwrap(); + assert_eq!(r0.num_bands(), 1, "only temperature should be read"); +} + +#[test] +fn explicit_arrays_filter_rejects_unknown_name() { + let tmp = build_xarray_style_fixture(); + let uri = format!("file://{}", tmp.path().display()); + let filter = vec!["humidity".to_string()]; + let err = read_all(&uri, Some(&filter)).unwrap_err().to_string(); + assert!(err.contains("humidity"), "got: {err}"); + assert!(err.contains("no array named"), "got: {err}"); +} + +#[test] +fn errors_when_crs_declared_without_transform() { + // CRS-without-transform is almost certainly malformed metadata — + // the user thinks they have full georef but downstream spatial + // joins would silently use the identity pixel transform. The + // loader refuses rather than producing wrong results. + let tmp = TempDir::new().unwrap(); + let store = Arc::new(FilesystemStore::new(tmp.path()).unwrap()); + let mut group_attrs = serde_json::Map::new(); + group_attrs.insert("proj:epsg".into(), serde_json::json!(4326)); + GroupBuilder::new() + .attributes(group_attrs) + .build(store.clone(), "/") + .unwrap() + .store_metadata() + .unwrap(); + ArrayBuilder::new(vec![2u64, 2], vec![1u64, 2], data_type::uint8(), 0u8) + .dimension_names(Some(["y", "x"])) + .build(store.clone(), "/temperature") + .unwrap() + .store_metadata() + .unwrap(); + + let uri = format!("file://{}", tmp.path().display()); + let err = read_all(&uri, None).unwrap_err().to_string(); + assert!(err.contains("CRS"), "got: {err}"); + assert!(err.contains("spatial:transform"), "got: {err}"); +} + +#[test] +fn explicit_arrays_filter_rejects_1d_arrays() { + // A user explicitly naming a 1-D array gets a clear "needs 2 dims" + // error at parse time, not a confusing downstream spatial-dim + // resolution failure. + let tmp = build_xarray_style_fixture(); + let uri = format!("file://{}", tmp.path().display()); + let filter = vec!["t".to_string()]; + let err = read_all(&uri, Some(&filter)).unwrap_err().to_string(); + assert!(err.contains("\"t\""), "got: {err}"); + assert!(err.contains("rank 1"), "got: {err}"); + assert!(err.contains("at least 2 dimensions"), "got: {err}"); +} + +#[test] +fn errors_when_group_has_only_1d_arrays() { + let tmp = TempDir::new().unwrap(); + let store = Arc::new(FilesystemStore::new(tmp.path()).unwrap()); + GroupBuilder::new() + .build(store.clone(), "/") + .unwrap() + .store_metadata() + .unwrap(); + ArrayBuilder::new(vec![4u64], vec![2u64], data_type::uint8(), 0u8) + .dimension_names(Some(["x"])) + .build(store.clone(), "/x") + .unwrap() + .store_metadata() + .unwrap(); + + let uri = format!("file://{}", tmp.path().display()); + let err = read_all(&uri, None).unwrap_err().to_string(); + assert!(err.contains("only 1-D arrays"), "got: {err}"); +} + +#[test] +fn falls_back_to_array_dimensions_attribute() { + // Simulates a Zarr v2 array (or any v3 array that lacks a first-class + // `dimension_names` field) by leaving `.dimension_names(None)` and + // setting xarray's `_ARRAY_DIMENSIONS` attribute instead. The loader + // must accept it and treat the attribute as authoritative. + let tmp = TempDir::new().unwrap(); + let store = Arc::new(FilesystemStore::new(tmp.path()).unwrap()); + + GroupBuilder::new() + .build(store.clone(), "/") + .unwrap() + .store_metadata() + .unwrap(); + + let mut attrs = serde_json::Map::new(); + attrs.insert("_ARRAY_DIMENSIONS".into(), serde_json::json!(["y", "x"])); + let array = ArrayBuilder::new(vec![2u64, 2], vec![1u64, 2], data_type::uint8(), 0u8) + .attributes(attrs) + .build(store.clone(), "/temperature") + .unwrap(); + array.store_metadata().unwrap(); + + let uri = format!("file://{}", tmp.path().display()); + let arr = read_all(&uri, None).unwrap(); + let rasters = RasterStructArray::new(&arr); + assert_eq!(rasters.len(), 2); + let r0 = rasters.get(0).unwrap(); + let band = r0.band(0).unwrap(); + // Anchor URI populated even though the array used the v2 dim-name + // fallback — proves the fallback feeds the rest of the pipeline. + assert_eq!(band.outdb_format(), Some("zarr")); + assert!(band.outdb_uri().unwrap().contains("#array=temperature")); +} + +#[test] +fn errors_on_mismatched_chunk_grids() { + let tmp = TempDir::new().unwrap(); + let store = Arc::new(FilesystemStore::new(tmp.path()).unwrap()); + GroupBuilder::new() + .build(store.clone(), "/") + .unwrap() + .store_metadata() + .unwrap(); + ArrayBuilder::new(vec![4u64, 4], vec![2u64, 2], data_type::uint8(), 0u8) + .dimension_names(Some(["y", "x"])) + .build(store.clone(), "/array_a") + .unwrap() + .store_metadata() + .unwrap(); + ArrayBuilder::new(vec![4u64, 4], vec![4u64, 4], data_type::uint8(), 0u8) + .dimension_names(Some(["y", "x"])) + .build(store.clone(), "/array_b") + .unwrap() + .store_metadata() + .unwrap(); + + let uri = format!("file://{}", tmp.path().display()); + let err = read_all(&uri, None).unwrap_err().to_string(); + assert!( + err.contains("chunk") && err.contains("array_a") && err.contains("array_b"), + "got: {err}" + ); +} diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs index 156b5b355..8fc8e3342 100644 --- a/rust/sedona/src/context.rs +++ b/rust/sedona/src/context.rs @@ -50,7 +50,7 @@ use sedona_common::{ option::add_sedona_option_extension, sedona_internal_datafusion_err, CrsProviderOption, SedonaOptions, }; -use sedona_datasource::provider::external_listing_table; +use sedona_datasource::provider::external_table; use sedona_datasource::spec::ExternalFormatSpec; use sedona_expr::scalar_udf::IntoScalarKernelRefs; use sedona_expr::{aggregate_udf::IntoSedonaAccumulatorRefs, function_set::FunctionSet}; @@ -412,12 +412,12 @@ impl SedonaContext { .map(|(k, v)| (k.clone(), v.clone())) .collect::>(); let spec = spec.with_options(&options_without_filesystems)?; - external_listing_table(spec, &self.ctx, urls, check_extension).await? + external_table(spec, &self.ctx, urls, check_extension).await? } else { - external_listing_table(spec, &self.ctx, urls, check_extension).await? + external_table(spec, &self.ctx, urls, check_extension).await? }; - self.ctx.read_table(Arc::new(provider)) + self.ctx.read_table(provider) } }