Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ wkb = "0.9.2"
wkt = "0.14.0"
zarrs = { version = "0.23", default-features = false }
zarrs_filesystem = "0.3"
zarrs_object_store = "0.5"

# Workspace path dependencies for internal crates
sedona = { version = "0.4.0", path = "rust/sedona" }
Expand Down
3 changes: 3 additions & 0 deletions python/sedonadb-zarr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ doc = false

[dependencies]
arrow-array = { workspace = true, features = ["ffi"] }
object_store = { workspace = true, features = ["aws", "gcp", "azure", "http"] }
pyo3 = { version = "0.25.1" }
sedona-raster-zarr = { workspace = true }
tokio = { workspace = true, features = ["rt"] }
url = { workspace = true }
108 changes: 105 additions & 3 deletions python/sedonadb-zarr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
//! Python-side `ZarrFormatSpec`.

use std::ffi::CString;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};

use arrow_array::ffi_stream::FFI_ArrowArrayStream;
use object_store::ObjectStore;
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
use sedona_raster_zarr::ZarrChunkReader;
use sedona_raster_zarr::{open_storage_from_uri, ZarrChunkReader};
use tokio::runtime::Builder;
use url::Url;

/// Single-use `__arrow_c_stream__` wrapper around `ZarrChunkReader`.
#[pyclass]
Expand All @@ -39,7 +42,24 @@ impl PyZarrChunkReader {
#[new]
#[pyo3(signature = (uri, arrays=None, batch_size=8192))]
fn new(uri: &str, arrays: Option<Vec<String>>, batch_size: usize) -> PyResult<Self> {
let reader = ZarrChunkReader::try_new(uri, arrays.as_deref(), batch_size)
let store =
default_object_store_for_uri(uri).map_err(|e| PyValueError::new_err(e.to_string()))?;
let storage =
open_storage_from_uri(uri, store).map_err(|e| PyValueError::new_err(e.to_string()))?;
// The crate's async-native loader exposes `try_new` as an
// `async fn`; the Python FFI is a sync constructor by
// contract, so we bridge with an ad-hoc current-thread tokio
// runtime here. `next()` on the returned reader is pure CPU
// and stays synchronous.
Comment on lines +49 to +53
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably fine, although you could also make a shared (global/static) runtime for your whole Python package if this becomes an issue.

let runtime = Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
let arrays_ref = arrays.as_deref();
let reader = runtime
.block_on(ZarrChunkReader::try_new(
storage, uri, arrays_ref, batch_size,
))
.map_err(|e| PyValueError::new_err(e.to_string()))?;
Ok(Self {
inner: Mutex::new(Some(reader)),
Expand Down Expand Up @@ -70,6 +90,88 @@ impl PyZarrChunkReader {
}
}

/// Build an `Arc<dyn ObjectStore>` for `uri` using env-var credential
/// discovery, per scheme. This is the temporary bridge that lets the
/// Python FFI work without going through `con.read_format`; it
/// reaches into `AWS_*` / `GOOGLE_*` / `AZURE_*` environment
/// variables via the `object_store` per-backend `Builder::from_env`
/// helpers.
///
/// **Slated for removal.** When the host wheel surfaces
/// `args.src.store` to `open_reader` via the `FFI_ObjectStore`
/// capsule machinery (in progress — see
/// <https://github.com/apache/sedona-db/pull/890>), this function (and
/// the `object_store` `aws`/`gcp`/`azure`/`http` features that back it)
/// gets deleted — the store will arrive credentialed from the host's
/// `ObjectStoreRegistry` and `PyZarrChunkReader::new` will extract it
/// from the capsule instead. For `file://` and bare paths the
/// returned store is a no-op placeholder; the loader uses
/// `FilesystemStore` directly for the local case.
fn default_object_store_for_uri(uri: &str) -> Result<Arc<dyn ObjectStore>, PyErr> {
// file:// and bare paths use a LocalFileSystem rooted at `/`;
// open_storage_from_uri prefixes it at the group's path. This
// matches the store the host's ObjectStoreRegistry yields for
// file://, so the loader treats local and cloud uniformly.
if uri.starts_with("file://") || !uri.contains("://") {
return Ok(Arc::new(object_store::local::LocalFileSystem::new()));
}
let url = Url::parse(uri)
.map_err(|e| PyValueError::new_err(format!("group URI {uri:?} is not a valid URL: {e}")))?;
match url.scheme().to_ascii_lowercase().as_str() {
Comment on lines +110 to +120
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some tests in the sedona crate (I think) that smoke test these openers that you can steal. Alternatively, include fewer options since we'll remove some (not sure HTTP applies, unless there's some directory listing stuff I don't know about, and gcs/azure could probably be dropped for now).

"s3" => {
use object_store::aws::AmazonS3Builder;
let store = AmazonS3Builder::from_env()
.with_url(uri)
.build()
.map_err(|e| PyValueError::new_err(build_err("s3", uri, e)))?;
Ok(Arc::new(store))
}
"gs" | "gcs" => {
use object_store::gcp::GoogleCloudStorageBuilder;
let store = GoogleCloudStorageBuilder::from_env()
.with_url(uri)
.build()
.map_err(|e| PyValueError::new_err(build_err("gcs", uri, e)))?;
Ok(Arc::new(store))
}
"az" | "abfs" | "abfss" => {
use object_store::azure::MicrosoftAzureBuilder;
let store = MicrosoftAzureBuilder::from_env()
.with_url(uri)
.build()
.map_err(|e| PyValueError::new_err(build_err("azure", uri, e)))?;
Ok(Arc::new(store))
}
"http" | "https" => {
use object_store::http::HttpBuilder;
// open_storage_from_uri applies the path as a PrefixStore,
// so the HttpStore must be rooted at scheme+authority only
// — unlike S3/GCS/Azure, HttpBuilder roots at whatever URL
// it's given, so hand it the authority without the path.
let authority = format!("{}://{}", url.scheme(), url.authority());
let store = HttpBuilder::new()
.with_url(authority)
.build()
.map_err(|e| PyValueError::new_err(build_err("http", uri, e)))?;
Ok(Arc::new(store))
}
other => Err(PyValueError::new_err(format!(
"unsupported Zarr URI scheme {other:?}; expected one of: \
file, s3, gs, gcs, az, abfs, abfss, http, https"
))),
}
}

fn build_err(backend: &str, uri: &str, err: object_store::Error) -> String {
format!(
"failed to build {backend} object_store for {uri}: {err}. \
Provide credentials via standard environment variables \
(AWS_ACCESS_KEY_ID/AWS_REGION for s3, GOOGLE_SERVICE_ACCOUNT_KEY \
for gcs, AZURE_STORAGE_ACCOUNT_NAME/AZURE_STORAGE_ACCOUNT_KEY \
for azure)."
)
}

#[pymodule]
fn _lib(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyZarrChunkReader>()?;
Expand Down
13 changes: 11 additions & 2 deletions rust/sedona-raster-zarr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,26 @@ result_large_err = "allow"
[dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
datafusion = { workspace = true, default_features = false }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can avoid this one by using a subcrate, the compile will probably be faster / wheel will probably be smaller

datafusion-common = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
sedona-common = { workspace = true }
sedona-raster = { workspace = true }
sedona-schema = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
zarrs = { workspace = true, features = ["filesystem", "gzip", "zstd", "blosc", "crc32c", "sharding", "transpose"] }
zarrs_filesystem = { workspace = true }
tokio = { workspace = true }
url = { workspace = true }
zarrs = { workspace = true, features = ["async", "gzip", "zstd", "blosc", "crc32c", "sharding", "transpose"] }
zarrs_object_store = { workspace = true }

[dev-dependencies]
object_store = { workspace = true, features = ["aws", "gcp", "azure", "http"] }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
# Test fixtures write Zarr groups to a temp dir; the loader itself
# reads everything through object_store, so this is dev-only.
zarrs_filesystem = { workspace = true }
1 change: 1 addition & 0 deletions rust/sedona-raster-zarr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ mod loader;
mod source_uri;

pub use loader::ZarrChunkReader;
pub use source_uri::open_storage_from_uri;
Loading
Loading