Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ wkb = "0.9.2"
wkt = "0.14.0"
zarrs = { version = "0.23", default-features = false }
zarrs_filesystem = "0.3"
zarrs_object_store = "0.5"

# Workspace path dependencies for internal crates
sedona = { version = "0.4.0", path = "rust/sedona" }
Expand Down
3 changes: 3 additions & 0 deletions python/sedonadb-zarr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ doc = false

[dependencies]
arrow-array = { workspace = true, features = ["ffi"] }
object_store = { workspace = true, features = ["aws", "http"] }
pyo3 = { version = "0.25.1" }
sedona-raster-zarr = { workspace = true }
tokio = { workspace = true, features = ["rt"] }
url = { workspace = true }
104 changes: 101 additions & 3 deletions python/sedonadb-zarr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,32 @@
//! Python-side `ZarrFormatSpec`.

use std::ffi::CString;
use std::sync::Mutex;
use std::sync::{Arc, Mutex, OnceLock};

use arrow_array::ffi_stream::FFI_ArrowArrayStream;
use object_store::ObjectStore;
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
use sedona_raster_zarr::ZarrChunkReader;
use sedona_raster_zarr::{open_storage_from_uri, ZarrChunkReader};
use tokio::runtime::{Builder, Runtime};
use url::Url;

/// Process-wide tokio runtime backing the sync Python FFI bridge.
///
/// `ZarrChunkReader::try_new` is async, but the Python constructor is sync
/// by contract, so we `block_on` here. Building a runtime per reader is
/// wasteful; one shared runtime serves every open in the package. `next()`
/// on the returned reader is pure CPU and never touches this runtime.
fn shared_runtime() -> &'static Runtime {
static RUNTIME: OnceLock<Runtime> = OnceLock::new();
RUNTIME.get_or_init(|| {
Builder::new_current_thread()
.enable_all()
.build()
.expect("build sedonadb-zarr tokio runtime")
})
}

/// Single-use `__arrow_c_stream__` wrapper around `ZarrChunkReader`.
#[pyclass]
Expand All @@ -39,7 +58,19 @@ impl PyZarrChunkReader {
#[new]
#[pyo3(signature = (uri, arrays=None, batch_size=8192))]
fn new(uri: &str, arrays: Option<Vec<String>>, batch_size: usize) -> PyResult<Self> {
let reader = ZarrChunkReader::try_new(uri, arrays.as_deref(), batch_size)
let store =
default_object_store_for_uri(uri).map_err(|e| PyValueError::new_err(e.to_string()))?;
let storage =
open_storage_from_uri(uri, store).map_err(|e| PyValueError::new_err(e.to_string()))?;
// The crate's async-native loader exposes `try_new` as an
// `async fn`; the Python FFI is a sync constructor by contract,
// so we bridge by blocking on the shared package runtime.
// `next()` on the returned reader is pure CPU and stays synchronous.
let arrays_ref = arrays.as_deref();
let reader = shared_runtime()
.block_on(ZarrChunkReader::try_new(
storage, uri, arrays_ref, batch_size,
))
.map_err(|e| PyValueError::new_err(e.to_string()))?;
Ok(Self {
inner: Mutex::new(Some(reader)),
Expand Down Expand Up @@ -70,6 +101,73 @@ impl PyZarrChunkReader {
}
}

/// Build an `Arc<dyn ObjectStore>` for `uri` using env-var credential
/// discovery, per scheme. This is the temporary bridge that lets the
/// Python FFI work without going through `con.read_format`; it
/// reaches into `AWS_*` environment variables via the `object_store`
/// per-backend `Builder::from_env` helpers.
///
/// Only the schemes exercised by the crate's tests are wired up (`s3`,
/// `http`/`https`); `gcs`/`azure` are intentionally omitted until there
/// is coverage for them.
///
/// **Slated for removal.** When the host wheel surfaces
/// `args.src.store` to `open_reader` via the `FFI_ObjectStore`
/// capsule machinery (in progress — see
/// <https://github.com/apache/sedona-db/pull/890>), this function (and
/// the `object_store` `aws`/`http` features that back it)
/// gets deleted — the store will arrive credentialed from the host's
/// `ObjectStoreRegistry` and `PyZarrChunkReader::new` will extract it
/// from the capsule instead. For `file://` and bare paths the
/// returned store is a no-op placeholder; the loader uses
/// `FilesystemStore` directly for the local case.
fn default_object_store_for_uri(uri: &str) -> Result<Arc<dyn ObjectStore>, PyErr> {
// file:// and bare paths use a LocalFileSystem rooted at `/`;
// open_storage_from_uri prefixes it at the group's path. This
// matches the store the host's ObjectStoreRegistry yields for
// file://, so the loader treats local and cloud uniformly.
if uri.starts_with("file://") || !uri.contains("://") {
return Ok(Arc::new(object_store::local::LocalFileSystem::new()));
}
let url = Url::parse(uri)
.map_err(|e| PyValueError::new_err(format!("group URI {uri:?} is not a valid URL: {e}")))?;
match url.scheme().to_ascii_lowercase().as_str() {
Comment thread
james-willis marked this conversation as resolved.
"s3" => {
use object_store::aws::AmazonS3Builder;
let store = AmazonS3Builder::from_env()
.with_url(uri)
.build()
.map_err(|e| PyValueError::new_err(build_err("s3", uri, e)))?;
Ok(Arc::new(store))
}
"http" | "https" => {
use object_store::http::HttpBuilder;
// open_storage_from_uri applies the path as a PrefixStore,
// so the HttpStore must be rooted at scheme+authority only
// — unlike S3, HttpBuilder roots at whatever URL it's given,
// so hand it the authority without the path.
let authority = format!("{}://{}", url.scheme(), url.authority());
let store = HttpBuilder::new()
.with_url(authority)
.build()
.map_err(|e| PyValueError::new_err(build_err("http", uri, e)))?;
Ok(Arc::new(store))
}
other => Err(PyValueError::new_err(format!(
"unsupported Zarr URI scheme {other:?}; expected one of: \
file, s3, http, https"
))),
}
}

fn build_err(backend: &str, uri: &str, err: object_store::Error) -> String {
format!(
"failed to build {backend} object_store for {uri}: {err}. \
Provide credentials via standard environment variables \
(e.g. AWS_ACCESS_KEY_ID/AWS_REGION for s3)."
)
}

#[pymodule]
fn _lib(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyZarrChunkReader>()?;
Expand Down
11 changes: 9 additions & 2 deletions rust/sedona-raster-zarr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,21 @@ arrow-schema = { workspace = true }
datafusion-common = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
sedona-common = { workspace = true }
sedona-raster = { workspace = true }
sedona-schema = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
zarrs = { workspace = true, features = ["filesystem", "gzip", "zstd", "blosc", "crc32c", "sharding", "transpose"] }
zarrs_filesystem = { workspace = true }
tokio = { workspace = true }
url = { workspace = true }
zarrs = { workspace = true, features = ["async", "gzip", "zstd", "blosc", "crc32c", "sharding", "transpose"] }
zarrs_object_store = { workspace = true }

[dev-dependencies]
object_store = { workspace = true, features = ["aws", "http"] }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
# Test fixtures write Zarr groups to a temp dir; the loader itself
# reads everything through object_store, so this is dev-only.
zarrs_filesystem = { workspace = true }
1 change: 1 addition & 0 deletions rust/sedona-raster-zarr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ mod loader;
mod source_uri;

pub use loader::ZarrChunkReader;
pub use source_uri::open_storage_from_uri;
Loading
Loading