From cbcc45d46012ebfe18cf7ff280989d8e7e968354 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 29 May 2026 16:02:30 -0500 Subject: [PATCH 1/4] fmt --- Cargo.lock | 3 + c/sedona-extension/Cargo.toml | 3 + c/sedona-extension/src/lib.rs | 1 + c/sedona-extension/src/object_store.rs | 82 ++++++++++++++++++++++++++ 4 files changed, 89 insertions(+) create mode 100644 c/sedona-extension/src/object_store.rs diff --git a/Cargo.lock b/Cargo.lock index 7967387a5..269a857e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5775,9 +5775,12 @@ version = "0.4.0" dependencies = [ "arrow-array", "arrow-schema", + "async-trait", "datafusion-common", "datafusion-expr", + "futures", "libc", + "object_store", "sedona-common", "sedona-expr", "sedona-schema", diff --git a/c/sedona-extension/Cargo.toml b/c/sedona-extension/Cargo.toml index e8f8c17c5..b14241a11 100644 --- a/c/sedona-extension/Cargo.toml +++ b/c/sedona-extension/Cargo.toml @@ -30,8 +30,11 @@ rust-version.workspace = true [dependencies] arrow-array = { workspace = true, features = ["ffi"]} arrow-schema = { workspace = true, features = ["ffi"]} +async-trait = { workspace = true } datafusion-common = { workspace = true } datafusion-expr = { workspace = true } +futures = { workspace = true } +object_store = { workspace = true, default-features = false } libc = "0.2.178" sedona-common = { workspace = true } sedona-expr = { workspace = true } diff --git a/c/sedona-extension/src/lib.rs b/c/sedona-extension/src/lib.rs index 073ed939b..cd3ac1f07 100644 --- a/c/sedona-extension/src/lib.rs +++ b/c/sedona-extension/src/lib.rs @@ -16,4 +16,5 @@ // under the License. pub mod extension; +pub mod object_store; pub mod scalar_kernel; diff --git a/c/sedona-extension/src/object_store.rs b/c/sedona-extension/src/object_store.rs new file mode 100644 index 000000000..0078998e5 --- /dev/null +++ b/c/sedona-extension/src/object_store.rs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Display, Formatter}; + +use async_trait::async_trait; +use futures::stream::BoxStream; +use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, +}; + +#[derive(Debug)] +pub struct SedonaCObjectStore; + +pub struct ImportedObjectStore { + inner: SedonaCObjectStore, +} + +impl Display for SedonaCObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "SedonaCObjectStore") + } +} + +#[async_trait] +impl ObjectStore for SedonaCObjectStore { + async fn put_opts( + &self, + _location: &Path, + _payload: PutPayload, + _opts: PutOptions, + ) -> Result { + todo!() + } + + async fn put_multipart_opts( + &self, + _location: &Path, + _opts: PutMultipartOptions, + ) -> Result> { + todo!() + } + + async fn get_opts(&self, _location: &Path, _options: GetOptions) -> Result { + todo!() + } + + async fn delete(&self, _location: &Path) -> Result<()> { + todo!() + } + + fn list(&self, _prefix: Option<&Path>) -> BoxStream<'static, Result> { + todo!() + } + + async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result { + todo!() + } + + async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> { + todo!() + } + + async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> { + todo!() + } +} From 6d0142adfeef1c49ad9c9114cc3512c266202482 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 29 May 2026 21:31:43 -0500 Subject: [PATCH 2/4] basic objstore wrapping --- c/sedona-extension/src/extension.rs | 46 ++++++ c/sedona-extension/src/object_store.rs | 196 +++++++++++++++++++++++-- 2 files changed, 227 insertions(+), 15 deletions(-) diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs index 7d957a57c..accbb8b87 100644 --- a/c/sedona-extension/src/extension.rs +++ b/c/sedona-extension/src/extension.rs @@ -23,6 +23,52 @@ use std::{ use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; +/// Raw FFI representation of an ObjectStore +/// +/// This follows the Arrow C Data Interface pattern with function pointers +/// for callbacks and a `release` callback for cleanup. +/// +/// See the ImportedObjectStore and ExportedObjectStore for high-level +/// APIs to import and export implementations using this struct. +#[derive(Default)] +#[repr(C)] +pub struct SedonaCObjectStore { + /// Callback to get a display string for the object store. + /// + /// The returned pointer must remain valid until the next call to any + /// method on this struct or until `release` is called. + /// + /// Return value: pointer to a null-terminated character array. + pub display: Option *const c_char>, + + /// Callback to get a debug string for the object store. + /// + /// The returned pointer must remain valid until the next call to any + /// method on this struct or until `release` is called. + /// + /// Return value: pointer to a null-terminated character array. + pub debug: Option *const c_char>, + + /// Release callback: release the object store's own resources. + pub release: Option, + + /// Opaque producer-specific data + pub private_data: *mut c_void, +} + +unsafe impl Send for SedonaCObjectStore {} +unsafe impl Sync for SedonaCObjectStore {} + +impl Drop for SedonaCObjectStore { + fn drop(&mut self) { + if let Some(releaser) = self.release { + unsafe { releaser(self) } + self.release = None; + self.private_data = null_mut(); + } + } +} + /// Raw FFI representation of the SedonaCScalarKernel /// /// See the ImportedScalarKernel and ExportedScalarKernel for high-level diff --git a/c/sedona-extension/src/object_store.rs b/c/sedona-extension/src/object_store.rs index 0078998e5..de2935cc4 100644 --- a/c/sedona-extension/src/object_store.rs +++ b/c/sedona-extension/src/object_store.rs @@ -15,36 +15,89 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::{Display, Formatter}; +use std::ffi::{c_char, c_void, CStr, CString}; +use std::fmt::{Debug, Display, Formatter}; +use std::ptr::null_mut; +use std::sync::Arc; use async_trait::async_trait; +use datafusion_common::{DataFusionError, Result}; use futures::stream::BoxStream; use object_store::{ path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, + PutMultipartOptions, PutOptions, PutPayload, PutResult, }; +use sedona_common::sedona_internal_err; -#[derive(Debug)] -pub struct SedonaCObjectStore; +use crate::extension::SedonaCObjectStore; +/// Wrapper around a [SedonaCObjectStore] that implements [ObjectStore] +/// +/// This is the means by which an ObjectStore implementation may be imported from a +/// C implementation. pub struct ImportedObjectStore { inner: SedonaCObjectStore, } -impl Display for SedonaCObjectStore { +impl TryFrom for ImportedObjectStore { + type Error = DataFusionError; + + fn try_from(value: SedonaCObjectStore) -> Result { + match (&value.display, &value.debug, &value.release) { + (Some(_), Some(_), Some(_)) => Ok(Self { inner: value }), + _ => { + sedona_internal_err!("Can't import released or uninitialized SedonaCObjectStore") + } + } + } +} + +impl Display for ImportedObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some(display_fn) = self.inner.display { + let c_str = unsafe { display_fn(&self.inner) }; + if c_str.is_null() { + write!(f, "") + } else { + let rust_str = unsafe { CStr::from_ptr(c_str) }.to_string_lossy(); + write!(f, "{}", rust_str) + } + } else { + write!(f, "ImportedObjectStore()") + } + } +} + +impl Debug for ImportedObjectStore { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "SedonaCObjectStore") + if let Some(debug_fn) = self.inner.debug { + let c_str = unsafe { debug_fn(&self.inner) }; + if c_str.is_null() { + f.debug_struct("ImportedObjectStore") + .field("inner", &"") + .finish() + } else { + let rust_str = unsafe { CStr::from_ptr(c_str) }.to_string_lossy(); + f.debug_struct("ImportedObjectStore") + .field("inner", &rust_str) + .finish() + } + } else { + f.debug_struct("ImportedObjectStore") + .field("inner", &"") + .finish() + } } } #[async_trait] -impl ObjectStore for SedonaCObjectStore { +impl ObjectStore for ImportedObjectStore { async fn put_opts( &self, _location: &Path, _payload: PutPayload, _opts: PutOptions, - ) -> Result { + ) -> object_store::Result { todo!() } @@ -52,31 +105,144 @@ impl ObjectStore for SedonaCObjectStore { &self, _location: &Path, _opts: PutMultipartOptions, - ) -> Result> { + ) -> object_store::Result> { todo!() } - async fn get_opts(&self, _location: &Path, _options: GetOptions) -> Result { + async fn get_opts( + &self, + _location: &Path, + _options: GetOptions, + ) -> object_store::Result { todo!() } - async fn delete(&self, _location: &Path) -> Result<()> { + async fn delete(&self, _location: &Path) -> object_store::Result<()> { todo!() } - fn list(&self, _prefix: Option<&Path>) -> BoxStream<'static, Result> { + fn list(&self, _prefix: Option<&Path>) -> BoxStream<'static, object_store::Result> { todo!() } - async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result { + async fn list_with_delimiter( + &self, + _prefix: Option<&Path>, + ) -> object_store::Result { todo!() } - async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> { + async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { todo!() } - async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> { + async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { todo!() } } + +/// Wrapper around an [Arc] that may be used to export an existing +/// ObjectStore across an FFI boundary using the [SedonaCObjectStore] +pub struct ExportedObjectStore { + inner: Arc, + display_cache: CString, + debug_cache: CString, +} + +impl From> for ExportedObjectStore { + fn from(value: Arc) -> Self { + let display_str = format!("{}", value); + let debug_str = format!("{:?}", value); + ExportedObjectStore { + inner: value, + display_cache: CString::new(display_str).unwrap_or_default(), + debug_cache: CString::new(debug_str).unwrap_or_default(), + } + } +} + +impl From for SedonaCObjectStore { + fn from(value: ExportedObjectStore) -> Self { + let box_value = Box::new(value); + Self { + display: Some(c_object_store_display), + debug: Some(c_object_store_debug), + release: Some(c_object_store_release), + private_data: Box::leak(box_value) as *mut ExportedObjectStore as *mut c_void, + } + } +} + +/// C callable wrapper to get the display string +unsafe extern "C" fn c_object_store_display(self_: *const SedonaCObjectStore) -> *const c_char { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedObjectStore) + .as_ref() + .unwrap(); + private_data.display_cache.as_ptr() +} + +/// C callable wrapper to get the debug string +unsafe extern "C" fn c_object_store_debug(self_: *const SedonaCObjectStore) -> *const c_char { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedObjectStore) + .as_ref() + .unwrap(); + private_data.debug_cache.as_ptr() +} + +/// C callable wrapper called when this value is dropped via FFI +unsafe extern "C" fn c_object_store_release(self_: *mut SedonaCObjectStore) { + assert!(!self_.is_null()); + let self_ref = self_.as_mut().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let boxed = Box::from_raw(self_ref.private_data as *mut ExportedObjectStore); + drop(boxed); + + self_ref.private_data = null_mut(); + self_ref.release = None; +} + +#[cfg(test)] +mod test { + use super::*; + use object_store::memory::InMemory; + + #[test] + fn test_export_import_display_debug() { + // Create an in-memory object store and export it + let store: Arc = Arc::new(InMemory::new()); + let original_display = format!("{}", store); + let original_debug = format!("{:?}", store); + + let exported = ExportedObjectStore::from(store); + let ffi_store: SedonaCObjectStore = exported.into(); + + // Import it back + let imported = ImportedObjectStore::try_from(ffi_store).unwrap(); + + // Verify Display works correctly (passes through unchanged) + let imported_display = format!("{}", imported); + assert_eq!(original_display, imported_display); + + // Verify Debug wraps in ImportedObjectStore + let imported_debug = format!("{:?}", imported); + assert!(imported_debug.contains("ImportedObjectStore")); + assert!(imported_debug.contains(&original_debug)); + } + + #[test] + fn test_invalid_import() { + // Create an empty/released SedonaCObjectStore + let empty = SedonaCObjectStore::default(); + let result = ImportedObjectStore::try_from(empty); + assert!(result.is_err()); + } +} From f72b5e7543e55903253ee3cd53c5c121cf799378 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 29 May 2026 21:58:44 -0500 Subject: [PATCH 3/4] add delete --- Cargo.lock | 1 + c/sedona-extension/Cargo.toml | 1 + c/sedona-extension/src/extension.rs | 92 +++++++ c/sedona-extension/src/object_store.rs | 319 +++++++++++++++++++++++-- 4 files changed, 399 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 269a857e9..4b068c8c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5785,6 +5785,7 @@ dependencies = [ "sedona-expr", "sedona-schema", "sedona-testing", + "tokio", ] [[package]] diff --git a/c/sedona-extension/Cargo.toml b/c/sedona-extension/Cargo.toml index b14241a11..7765150dd 100644 --- a/c/sedona-extension/Cargo.toml +++ b/c/sedona-extension/Cargo.toml @@ -36,6 +36,7 @@ datafusion-expr = { workspace = true } futures = { workspace = true } object_store = { workspace = true, default-features = false } libc = "0.2.178" +tokio = { workspace = true } sedona-common = { workspace = true } sedona-expr = { workspace = true } sedona-schema = { workspace = true } diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs index accbb8b87..aff44f657 100644 --- a/c/sedona-extension/src/extension.rs +++ b/c/sedona-extension/src/extension.rs @@ -23,6 +23,79 @@ use std::{ use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; +// ----------------------------------------------------------------------------- +// Async Result Handler +// ----------------------------------------------------------------------------- + +/// Raw FFI representation of an async result handler. +/// +/// This follows the Arrow Async Stream interface pattern where the consumer +/// provides callback handlers that the producer calls when the async operation +/// completes. This enables high-concurrency scenarios without blocking threads. +/// +/// The producer MUST call exactly one of `on_success` or `on_error`, followed +/// by `release`. The handler is invalid after `release` is called. +/// +/// Similar to Arrow's `ArrowAsyncDeviceStreamHandler`, but for single operations +/// rather than streams. +#[derive(Default)] +#[repr(C)] +pub struct SedonaCAsyncResultHandler { + /// Called on successful completion with optional payload data. + /// + /// `data` and `data_len` provide an optional byte payload. If no payload, + /// `data` is NULL and `data_len` is 0. The data is only valid for the + /// duration of this callback - the consumer must copy if needed longer. + /// + /// After this callback returns, the producer MUST call `release`. + pub on_success: Option< + unsafe extern "C" fn( + self_: *mut SedonaCAsyncResultHandler, + data: *const u8, + data_len: usize, + ), + >, + + /// Called on error with an error code and message. + /// + /// `code` is an `errno`-compatible error code. + /// `message` is a null-terminated error string, valid only for this callback. + /// + /// After this callback returns, the producer MUST call `release`. + pub on_error: Option< + unsafe extern "C" fn( + self_: *mut SedonaCAsyncResultHandler, + code: c_int, + message: *const c_char, + ), + >, + + /// Release callback to clean up the handler's resources. + /// + /// The producer MUST call this after calling either `on_success` or `on_error`. + /// After this is called, the handler is invalid and must not be used. + pub release: Option, + + /// Opaque consumer-specific data + pub private_data: *mut c_void, +} + +unsafe impl Send for SedonaCAsyncResultHandler {} + +impl Drop for SedonaCAsyncResultHandler { + fn drop(&mut self) { + if let Some(releaser) = self.release { + unsafe { releaser(self) } + self.release = None; + self.private_data = null_mut(); + } + } +} + +// ----------------------------------------------------------------------------- +// Object Store +// ----------------------------------------------------------------------------- + /// Raw FFI representation of an ObjectStore /// /// This follows the Arrow C Data Interface pattern with function pointers @@ -49,6 +122,25 @@ pub struct SedonaCObjectStore { /// Return value: pointer to a null-terminated character array. pub debug: Option *const c_char>, + /// Delete the object at the specified location. + /// + /// This is an async operation. The implementation MUST eventually call + /// either `on_success` or `on_error` on the handler, followed by `release`. + /// + /// `location` is a null-terminated path string that remains valid for the + /// duration of this call only. + /// + /// Return value: 0 if the operation was successfully initiated, + /// `errno`-compatible error code if the operation could not be started. + /// If non-zero is returned, the handler callbacks must NOT be called. + pub delete: Option< + unsafe extern "C" fn( + self_: *const SedonaCObjectStore, + location: *const c_char, + handler: *mut SedonaCAsyncResultHandler, + ) -> c_int, + >, + /// Release callback: release the object store's own resources. pub release: Option, diff --git a/c/sedona-extension/src/object_store.rs b/c/sedona-extension/src/object_store.rs index de2935cc4..df0dd9af4 100644 --- a/c/sedona-extension/src/object_store.rs +++ b/c/sedona-extension/src/object_store.rs @@ -15,21 +15,23 @@ // specific language governing permissions and limitations // under the License. -use std::ffi::{c_char, c_void, CStr, CString}; +use std::ffi::{c_char, c_int, c_void, CStr, CString}; use std::fmt::{Debug, Display, Formatter}; -use std::ptr::null_mut; +use std::ptr::{null, null_mut}; use std::sync::Arc; use async_trait::async_trait; use datafusion_common::{DataFusionError, Result}; +use futures::channel::oneshot; use futures::stream::BoxStream; use object_store::{ path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, }; use sedona_common::sedona_internal_err; +use tokio::runtime::Handle; -use crate::extension::SedonaCObjectStore; +use crate::extension::{SedonaCAsyncResultHandler, SedonaCObjectStore}; /// Wrapper around a [SedonaCObjectStore] that implements [ObjectStore] /// @@ -43,8 +45,8 @@ impl TryFrom for ImportedObjectStore { type Error = DataFusionError; fn try_from(value: SedonaCObjectStore) -> Result { - match (&value.display, &value.debug, &value.release) { - (Some(_), Some(_), Some(_)) => Ok(Self { inner: value }), + match (&value.display, &value.debug, &value.delete, &value.release) { + (Some(_), Some(_), Some(_), Some(_)) => Ok(Self { inner: value }), _ => { sedona_internal_err!("Can't import released or uninitialized SedonaCObjectStore") } @@ -90,6 +92,98 @@ impl Debug for ImportedObjectStore { } } +// ----------------------------------------------------------------------------- +// Imported Async Handler (Rust creates, C calls back) +// ----------------------------------------------------------------------------- + +/// Handler for async results when importing a C object store. +/// +/// This is created by the Rust import side and passed to C. When the async +/// operation completes, C calls back into this handler. +struct ImportedAsyncResultHandler { + sender: Option>>, +} + +impl ImportedAsyncResultHandler { + fn new(sender: oneshot::Sender>) -> Self { + Self { + sender: Some(sender), + } + } +} + +impl From for SedonaCAsyncResultHandler { + fn from(value: ImportedAsyncResultHandler) -> Self { + let box_value = Box::new(value); + Self { + on_success: Some(c_imported_handler_on_success), + on_error: Some(c_imported_handler_on_error), + release: Some(c_imported_handler_release), + private_data: Box::leak(box_value) as *mut ImportedAsyncResultHandler as *mut c_void, + } + } +} + +/// C callback for success (called by C implementation) +unsafe extern "C" fn c_imported_handler_on_success( + self_: *mut SedonaCAsyncResultHandler, + _data: *const u8, + _data_len: usize, +) { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ImportedAsyncResultHandler) + .as_mut() + .unwrap(); + + // Send success through the channel + if let Some(sender) = private_data.sender.take() { + let _ = sender.send(Ok(())); + } +} + +/// C callback for error (called by C implementation) +unsafe extern "C" fn c_imported_handler_on_error( + self_: *mut SedonaCAsyncResultHandler, + code: c_int, + message: *const c_char, +) { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ImportedAsyncResultHandler) + .as_mut() + .unwrap(); + + // Extract error message + let msg = if message.is_null() { + String::new() + } else { + CStr::from_ptr(message).to_string_lossy().into_owned() + }; + + // Send error through the channel + if let Some(sender) = private_data.sender.take() { + let _ = sender.send(Err((code, msg))); + } +} + +/// C callback for release (called by C implementation after on_success/on_error) +unsafe extern "C" fn c_imported_handler_release(self_: *mut SedonaCAsyncResultHandler) { + assert!(!self_.is_null()); + let self_ref = self_.as_mut().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let boxed = Box::from_raw(self_ref.private_data as *mut ImportedAsyncResultHandler); + drop(boxed); + + self_ref.private_data = null_mut(); + self_ref.release = None; +} + #[async_trait] impl ObjectStore for ImportedObjectStore { async fn put_opts( @@ -117,8 +211,50 @@ impl ObjectStore for ImportedObjectStore { todo!() } - async fn delete(&self, _location: &Path) -> object_store::Result<()> { - todo!() + async fn delete(&self, location: &Path) -> object_store::Result<()> { + // Convert location to C string + let location_cstr = + CString::new(location.as_ref()).map_err(|e| object_store::Error::Generic { + store: "ImportedObjectStore", + source: e.into(), + })?; + + // Create a oneshot channel to receive the result + let (tx, rx) = oneshot::channel::>(); + + // Create the handler that signals the channel + let handler = ImportedAsyncResultHandler::new(tx); + let mut ffi_handler: SedonaCAsyncResultHandler = handler.into(); + + // Call the C delete function + let delete_fn = self.inner.delete.expect("delete callback missing"); + let code = unsafe { delete_fn(&self.inner, location_cstr.as_ptr(), &mut ffi_handler) }; + + if code != 0 { + // Operation failed to start - clean up handler manually + // Note: we don't call release since the C side didn't accept the handler + std::mem::forget(ffi_handler); + return Err(object_store::Error::Generic { + store: "ImportedObjectStore", + source: format!("delete failed to start: errno {}", code).into(), + }); + } + + // Await the result from the callback + // Note: the handler is now owned by the C side, which will call release + std::mem::forget(ffi_handler); + + match rx.await { + Ok(Ok(())) => Ok(()), + Ok(Err((code, msg))) => Err(object_store::Error::Generic { + store: "ImportedObjectStore", + source: format!("delete failed ({}): {}", code, msg).into(), + }), + Err(_) => Err(object_store::Error::Generic { + store: "ImportedObjectStore", + source: "delete callback channel cancelled".into(), + }), + } } fn list(&self, _prefix: Option<&Path>) -> BoxStream<'static, object_store::Result> { @@ -145,28 +281,45 @@ impl ObjectStore for ImportedObjectStore { /// ObjectStore across an FFI boundary using the [SedonaCObjectStore] pub struct ExportedObjectStore { inner: Arc, + runtime_handle: Handle, display_cache: CString, debug_cache: CString, } -impl From> for ExportedObjectStore { - fn from(value: Arc) -> Self { - let display_str = format!("{}", value); - let debug_str = format!("{:?}", value); +impl ExportedObjectStore { + /// Create a new ExportedObjectStore with an explicit runtime handle. + /// + /// The runtime handle is used to spawn async operations when C calls + /// into this object store. + pub fn new(store: Arc, runtime_handle: Handle) -> Self { + let display_str = format!("{}", store); + let debug_str = format!("{:?}", store); ExportedObjectStore { - inner: value, + inner: store, + runtime_handle, display_cache: CString::new(display_str).unwrap_or_default(), debug_cache: CString::new(debug_str).unwrap_or_default(), } } } +impl From> for ExportedObjectStore { + /// Create an ExportedObjectStore using the current tokio runtime. + /// + /// # Panics + /// Panics if not called from within a tokio runtime context. + fn from(value: Arc) -> Self { + Self::new(value, Handle::current()) + } +} + impl From for SedonaCObjectStore { fn from(value: ExportedObjectStore) -> Self { let box_value = Box::new(value); Self { display: Some(c_object_store_display), debug: Some(c_object_store_debug), + delete: Some(c_object_store_delete), release: Some(c_object_store_release), private_data: Box::leak(box_value) as *mut ExportedObjectStore as *mut c_void, } @@ -197,6 +350,101 @@ unsafe extern "C" fn c_object_store_debug(self_: *const SedonaCObjectStore) -> * private_data.debug_cache.as_ptr() } +/// C callable wrapper for async delete operation +/// +/// This spawns the async delete on the runtime and calls the handler callbacks +/// when complete. +unsafe extern "C" fn c_object_store_delete( + self_: *const SedonaCObjectStore, + location: *const c_char, + handler: *mut SedonaCAsyncResultHandler, +) -> c_int { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedObjectStore) + .as_ref() + .unwrap(); + + // Validate handler + if handler.is_null() { + return libc::EINVAL; + } + let handler_ref = handler.as_ref().unwrap(); + if handler_ref.on_success.is_none() + || handler_ref.on_error.is_none() + || handler_ref.release.is_none() + { + return libc::EINVAL; + } + + // Convert location to Rust Path + if location.is_null() { + return libc::EINVAL; + } + let location_str = match CStr::from_ptr(location).to_str() { + Ok(s) => s, + Err(_) => return libc::EINVAL, + }; + let path = Path::from(location_str); + + // Clone what we need for the async task + let store = private_data.inner.clone(); + let handle = private_data.runtime_handle.clone(); + + // Wrap the handler pointer in a Send-safe wrapper + let handler_wrapper = SendableHandler(handler); + + // Spawn the async work - only capture the wrapper, not the raw pointer + handle.spawn(async move { + let result = store.delete(&path).await; + + // Now extract the handler pointer after the await + handler_wrapper.complete(result); + }); + + 0 // Operation accepted +} + +/// Wrapper to allow sending a handler pointer across threads. +/// +/// # Safety +/// The caller must ensure the handler pointer remains valid until the +/// async operation completes and callbacks are invoked. +struct SendableHandler(*mut SedonaCAsyncResultHandler); +unsafe impl Send for SendableHandler {} + +impl SendableHandler { + /// Complete the async operation and call the appropriate callbacks. + /// + /// # Safety + /// The handler pointer must still be valid. + unsafe fn complete(self, result: object_store::Result<()>) { + let handler = self.0; + + // Call the appropriate callback + match result { + Ok(()) => { + if let Some(on_success) = (*handler).on_success { + on_success(handler, null(), 0); + } + } + Err(e) => { + let msg = CString::new(e.to_string()).unwrap_or_default(); + if let Some(on_error) = (*handler).on_error { + on_error(handler, libc::EIO, msg.as_ptr()); + } + } + } + + // Always call release + if let Some(release) = (*handler).release { + release(handler); + } + } +} + /// C callable wrapper called when this value is dropped via FFI unsafe extern "C" fn c_object_store_release(self_: *mut SedonaCObjectStore) { assert!(!self_.is_null()); @@ -214,9 +462,10 @@ unsafe extern "C" fn c_object_store_release(self_: *mut SedonaCObjectStore) { mod test { use super::*; use object_store::memory::InMemory; + use object_store::PutPayload; - #[test] - fn test_export_import_display_debug() { + #[tokio::test] + async fn test_export_import_display_debug() { // Create an in-memory object store and export it let store: Arc = Arc::new(InMemory::new()); let original_display = format!("{}", store); @@ -245,4 +494,46 @@ mod test { let result = ImportedObjectStore::try_from(empty); assert!(result.is_err()); } + + #[tokio::test] + async fn test_delete_roundtrip() { + // Create an in-memory object store with a file + let store = Arc::new(InMemory::new()); + let path = Path::from("test/file.txt"); + store + .put(&path, PutPayload::from_static(b"hello")) + .await + .unwrap(); + + // Verify the file exists + assert!(store.get(&path).await.is_ok()); + + // Export and import the store + let exported = ExportedObjectStore::from(store.clone() as Arc); + let ffi_store: SedonaCObjectStore = exported.into(); + let imported = ImportedObjectStore::try_from(ffi_store).unwrap(); + + // Delete through the FFI boundary + imported.delete(&path).await.unwrap(); + + // Verify the file is gone (check via original store reference) + assert!(store.get(&path).await.is_err()); + } + + #[tokio::test] + async fn test_delete_nonexistent() { + // Create an empty in-memory object store + let store: Arc = Arc::new(InMemory::new()); + + // Export and import + let exported = ExportedObjectStore::from(store); + let ffi_store: SedonaCObjectStore = exported.into(); + let imported = ImportedObjectStore::try_from(ffi_store).unwrap(); + + // Delete a non-existent file + // Note: InMemory store succeeds even for non-existent files + let path = Path::from("nonexistent.txt"); + let result = imported.delete(&path).await; + assert!(result.is_ok()); + } } From 47c77e405503c5cebf020a3ca92f87350fab1ea9 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 29 May 2026 22:08:11 -0500 Subject: [PATCH 4/4] implement copy --- c/sedona-extension/src/extension.rs | 41 ++++ c/sedona-extension/src/object_store.rs | 323 ++++++++++++++++++++++++- 2 files changed, 358 insertions(+), 6 deletions(-) diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs index aff44f657..0f08e43ee 100644 --- a/c/sedona-extension/src/extension.rs +++ b/c/sedona-extension/src/extension.rs @@ -141,6 +141,47 @@ pub struct SedonaCObjectStore { ) -> c_int, >, + /// Copy an object from one location to another. + /// + /// This is an async operation. The implementation MUST eventually call + /// either `on_success` or `on_error` on the handler, followed by `release`. + /// + /// `from` and `to` are null-terminated path strings that remain valid for + /// the duration of this call only. + /// + /// Return value: 0 if the operation was successfully initiated, + /// `errno`-compatible error code if the operation could not be started. + /// If non-zero is returned, the handler callbacks must NOT be called. + pub copy: Option< + unsafe extern "C" fn( + self_: *const SedonaCObjectStore, + from: *const c_char, + to: *const c_char, + handler: *mut SedonaCAsyncResultHandler, + ) -> c_int, + >, + + /// Copy an object from one location to another, only if the destination + /// does not already exist. + /// + /// This is an async operation. The implementation MUST eventually call + /// either `on_success` or `on_error` on the handler, followed by `release`. + /// + /// `from` and `to` are null-terminated path strings that remain valid for + /// the duration of this call only. + /// + /// Return value: 0 if the operation was successfully initiated, + /// `errno`-compatible error code if the operation could not be started. + /// If non-zero is returned, the handler callbacks must NOT be called. + pub copy_if_not_exists: Option< + unsafe extern "C" fn( + self_: *const SedonaCObjectStore, + from: *const c_char, + to: *const c_char, + handler: *mut SedonaCAsyncResultHandler, + ) -> c_int, + >, + /// Release callback: release the object store's own resources. pub release: Option, diff --git a/c/sedona-extension/src/object_store.rs b/c/sedona-extension/src/object_store.rs index df0dd9af4..ee98f2a98 100644 --- a/c/sedona-extension/src/object_store.rs +++ b/c/sedona-extension/src/object_store.rs @@ -45,8 +45,15 @@ impl TryFrom for ImportedObjectStore { type Error = DataFusionError; fn try_from(value: SedonaCObjectStore) -> Result { - match (&value.display, &value.debug, &value.delete, &value.release) { - (Some(_), Some(_), Some(_), Some(_)) => Ok(Self { inner: value }), + match ( + &value.display, + &value.debug, + &value.delete, + &value.copy, + &value.copy_if_not_exists, + &value.release, + ) { + (Some(_), Some(_), Some(_), Some(_), Some(_), Some(_)) => Ok(Self { inner: value }), _ => { sedona_internal_err!("Can't import released or uninitialized SedonaCObjectStore") } @@ -268,12 +275,111 @@ impl ObjectStore for ImportedObjectStore { todo!() } - async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { - todo!() + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + // Convert paths to C strings + let from_cstr = CString::new(from.as_ref()).map_err(|e| object_store::Error::Generic { + store: "ImportedObjectStore", + source: e.into(), + })?; + let to_cstr = CString::new(to.as_ref()).map_err(|e| object_store::Error::Generic { + store: "ImportedObjectStore", + source: e.into(), + })?; + + // Create a oneshot channel to receive the result + let (tx, rx) = oneshot::channel::>(); + + // Create the handler that signals the channel + let handler = ImportedAsyncResultHandler::new(tx); + let mut ffi_handler: SedonaCAsyncResultHandler = handler.into(); + + // Call the C copy function + let copy_fn = self.inner.copy.expect("copy callback missing"); + let code = unsafe { + copy_fn( + &self.inner, + from_cstr.as_ptr(), + to_cstr.as_ptr(), + &mut ffi_handler, + ) + }; + + if code != 0 { + std::mem::forget(ffi_handler); + return Err(object_store::Error::Generic { + store: "ImportedObjectStore", + source: format!("copy failed to start: errno {}", code).into(), + }); + } + + std::mem::forget(ffi_handler); + + match rx.await { + Ok(Ok(())) => Ok(()), + Ok(Err((code, msg))) => Err(object_store::Error::Generic { + store: "ImportedObjectStore", + source: format!("copy failed ({}): {}", code, msg).into(), + }), + Err(_) => Err(object_store::Error::Generic { + store: "ImportedObjectStore", + source: "copy callback channel cancelled".into(), + }), + } } - async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { - todo!() + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { + // Convert paths to C strings + let from_cstr = CString::new(from.as_ref()).map_err(|e| object_store::Error::Generic { + store: "ImportedObjectStore", + source: e.into(), + })?; + let to_cstr = CString::new(to.as_ref()).map_err(|e| object_store::Error::Generic { + store: "ImportedObjectStore", + source: e.into(), + })?; + + // Create a oneshot channel to receive the result + let (tx, rx) = oneshot::channel::>(); + + // Create the handler that signals the channel + let handler = ImportedAsyncResultHandler::new(tx); + let mut ffi_handler: SedonaCAsyncResultHandler = handler.into(); + + // Call the C copy_if_not_exists function + let copy_fn = self + .inner + .copy_if_not_exists + .expect("copy_if_not_exists callback missing"); + let code = unsafe { + copy_fn( + &self.inner, + from_cstr.as_ptr(), + to_cstr.as_ptr(), + &mut ffi_handler, + ) + }; + + if code != 0 { + std::mem::forget(ffi_handler); + return Err(object_store::Error::Generic { + store: "ImportedObjectStore", + source: format!("copy_if_not_exists failed to start: errno {}", code).into(), + }); + } + + std::mem::forget(ffi_handler); + + match rx.await { + Ok(Ok(())) => Ok(()), + Ok(Err((code, msg))) => Err(object_store::Error::Generic { + store: "ImportedObjectStore", + source: format!("copy_if_not_exists failed ({}): {}", code, msg).into(), + }), + Err(_) => Err(object_store::Error::Generic { + store: "ImportedObjectStore", + source: "copy_if_not_exists callback channel cancelled".into(), + }), + } } } @@ -320,6 +426,8 @@ impl From for SedonaCObjectStore { display: Some(c_object_store_display), debug: Some(c_object_store_debug), delete: Some(c_object_store_delete), + copy: Some(c_object_store_copy), + copy_if_not_exists: Some(c_object_store_copy_if_not_exists), release: Some(c_object_store_release), private_data: Box::leak(box_value) as *mut ExportedObjectStore as *mut c_void, } @@ -407,6 +515,128 @@ unsafe extern "C" fn c_object_store_delete( 0 // Operation accepted } +/// C callable wrapper for async copy operation +/// +/// This spawns the async copy on the runtime and calls the handler callbacks +/// when complete. +unsafe extern "C" fn c_object_store_copy( + self_: *const SedonaCObjectStore, + from: *const c_char, + to: *const c_char, + handler: *mut SedonaCAsyncResultHandler, +) -> c_int { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedObjectStore) + .as_ref() + .unwrap(); + + // Validate handler + if handler.is_null() { + return libc::EINVAL; + } + let handler_ref = handler.as_ref().unwrap(); + if handler_ref.on_success.is_none() + || handler_ref.on_error.is_none() + || handler_ref.release.is_none() + { + return libc::EINVAL; + } + + // Convert paths to Rust Path + if from.is_null() || to.is_null() { + return libc::EINVAL; + } + let from_str = match CStr::from_ptr(from).to_str() { + Ok(s) => s, + Err(_) => return libc::EINVAL, + }; + let to_str = match CStr::from_ptr(to).to_str() { + Ok(s) => s, + Err(_) => return libc::EINVAL, + }; + let from_path = Path::from(from_str); + let to_path = Path::from(to_str); + + // Clone what we need for the async task + let store = private_data.inner.clone(); + let handle = private_data.runtime_handle.clone(); + + // Wrap the handler pointer in a Send-safe wrapper + let handler_wrapper = SendableHandler(handler); + + // Spawn the async work + handle.spawn(async move { + let result = store.copy(&from_path, &to_path).await; + handler_wrapper.complete(result); + }); + + 0 // Operation accepted +} + +/// C callable wrapper for async copy_if_not_exists operation +/// +/// This spawns the async copy_if_not_exists on the runtime and calls the handler +/// callbacks when complete. +unsafe extern "C" fn c_object_store_copy_if_not_exists( + self_: *const SedonaCObjectStore, + from: *const c_char, + to: *const c_char, + handler: *mut SedonaCAsyncResultHandler, +) -> c_int { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedObjectStore) + .as_ref() + .unwrap(); + + // Validate handler + if handler.is_null() { + return libc::EINVAL; + } + let handler_ref = handler.as_ref().unwrap(); + if handler_ref.on_success.is_none() + || handler_ref.on_error.is_none() + || handler_ref.release.is_none() + { + return libc::EINVAL; + } + + // Convert paths to Rust Path + if from.is_null() || to.is_null() { + return libc::EINVAL; + } + let from_str = match CStr::from_ptr(from).to_str() { + Ok(s) => s, + Err(_) => return libc::EINVAL, + }; + let to_str = match CStr::from_ptr(to).to_str() { + Ok(s) => s, + Err(_) => return libc::EINVAL, + }; + let from_path = Path::from(from_str); + let to_path = Path::from(to_str); + + // Clone what we need for the async task + let store = private_data.inner.clone(); + let handle = private_data.runtime_handle.clone(); + + // Wrap the handler pointer in a Send-safe wrapper + let handler_wrapper = SendableHandler(handler); + + // Spawn the async work + handle.spawn(async move { + let result = store.copy_if_not_exists(&from_path, &to_path).await; + handler_wrapper.complete(result); + }); + + 0 // Operation accepted +} + /// Wrapper to allow sending a handler pointer across threads. /// /// # Safety @@ -536,4 +766,85 @@ mod test { let result = imported.delete(&path).await; assert!(result.is_ok()); } + + #[tokio::test] + async fn test_copy_roundtrip() { + // Create an in-memory object store with a file + let store = Arc::new(InMemory::new()); + let src_path = Path::from("source.txt"); + let dst_path = Path::from("dest.txt"); + store + .put(&src_path, PutPayload::from_static(b"hello copy")) + .await + .unwrap(); + + // Export and import the store + let exported = ExportedObjectStore::from(store.clone() as Arc); + let ffi_store: SedonaCObjectStore = exported.into(); + let imported = ImportedObjectStore::try_from(ffi_store).unwrap(); + + // Copy through the FFI boundary + imported.copy(&src_path, &dst_path).await.unwrap(); + + // Verify both files exist with same content + let src_data = store.get(&src_path).await.unwrap().bytes().await.unwrap(); + let dst_data = store.get(&dst_path).await.unwrap().bytes().await.unwrap(); + assert_eq!(src_data, dst_data); + assert_eq!(&src_data[..], b"hello copy"); + } + + #[tokio::test] + async fn test_copy_if_not_exists_success() { + // Create an in-memory object store with a file + let store = Arc::new(InMemory::new()); + let src_path = Path::from("source.txt"); + let dst_path = Path::from("new_dest.txt"); + store + .put(&src_path, PutPayload::from_static(b"hello")) + .await + .unwrap(); + + // Export and import the store + let exported = ExportedObjectStore::from(store.clone() as Arc); + let ffi_store: SedonaCObjectStore = exported.into(); + let imported = ImportedObjectStore::try_from(ffi_store).unwrap(); + + // Copy should succeed since dest doesn't exist + imported + .copy_if_not_exists(&src_path, &dst_path) + .await + .unwrap(); + + // Verify dest was created + assert!(store.get(&dst_path).await.is_ok()); + } + + #[tokio::test] + async fn test_copy_if_not_exists_fails_when_exists() { + // Create an in-memory object store with source and dest files + let store = Arc::new(InMemory::new()); + let src_path = Path::from("source.txt"); + let dst_path = Path::from("existing_dest.txt"); + store + .put(&src_path, PutPayload::from_static(b"source")) + .await + .unwrap(); + store + .put(&dst_path, PutPayload::from_static(b"existing")) + .await + .unwrap(); + + // Export and import the store + let exported = ExportedObjectStore::from(store.clone() as Arc); + let ffi_store: SedonaCObjectStore = exported.into(); + let imported = ImportedObjectStore::try_from(ffi_store).unwrap(); + + // Copy should fail since dest exists + let result = imported.copy_if_not_exists(&src_path, &dst_path).await; + assert!(result.is_err()); + + // Verify dest was not overwritten + let dst_data = store.get(&dst_path).await.unwrap().bytes().await.unwrap(); + assert_eq!(&dst_data[..], b"existing"); + } }