diff --git a/Cargo.lock b/Cargo.lock index a5f233704..26fc88229 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5735,14 +5735,19 @@ dependencies = [ name = "sedona-raster-gdal" version = "0.4.0" dependencies = [ + "arrow", "arrow-array", "arrow-buffer", + "arrow-schema", "criterion", "datafusion-common", + "datafusion-expr", "lru", "sedona-common", + "sedona-expr", "sedona-gdal", "sedona-raster", + "sedona-raster-functions", "sedona-schema", "sedona-testing", "tempfile", diff --git a/rust/sedona-raster-gdal/Cargo.toml b/rust/sedona-raster-gdal/Cargo.toml index 5dba31c98..ac1fccc30 100644 --- a/rust/sedona-raster-gdal/Cargo.toml +++ b/rust/sedona-raster-gdal/Cargo.toml @@ -31,13 +31,18 @@ rust-version.workspace = true result_large_err = "allow" [dependencies] +arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } +arrow-schema = { workspace = true } datafusion-common = { workspace = true } +datafusion-expr = { workspace = true } lru = { workspace = true } sedona-common = { workspace = true } +sedona-expr = { workspace = true } sedona-gdal = { workspace = true } sedona-raster = { workspace = true } +sedona-raster-functions = { workspace = true } sedona-schema = { workspace = true } [dev-dependencies] diff --git a/rust/sedona-raster-gdal/src/lib.rs b/rust/sedona-raster-gdal/src/lib.rs index 8e8c871fb..d19183450 100644 --- a/rust/sedona-raster-gdal/src/lib.rs +++ b/rust/sedona-raster-gdal/src/lib.rs @@ -32,6 +32,8 @@ mod gdal_common; #[allow(dead_code)] mod gdal_dataset_provider; +mod rs_from_path; +mod rs_metadata; mod utils; #[cfg(test)] @@ -42,4 +44,10 @@ pub use gdal_common::{ band_data_type_to_gdal, bytes_to_f64, gdal_to_band_data_type, gdal_type_byte_size, nodata_bytes_to_f64, nodata_f64_to_bytes, }; +pub use rs_from_path::rs_from_path_udf; +pub use rs_metadata::rs_metadata_udf; pub use utils::{append_as_indb_raster, dataset_to_indb_raster}; + +pub fn all_gdal_udfs() -> Vec { + vec![rs_from_path_udf(), rs_metadata_udf()] +} diff --git a/rust/sedona-raster-gdal/src/rs_from_path.rs b/rust/sedona-raster-gdal/src/rs_from_path.rs new file mode 100644 index 000000000..0bdcfcb09 --- /dev/null +++ b/rust/sedona-raster-gdal/src/rs_from_path.rs @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! RS_FromPath UDF - Load out-db raster from file path. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow::compute::cast; +use arrow_array::{Array, ArrayRef, StructArray}; +use arrow_schema::DataType; +use datafusion_common::cast::as_string_array; +use datafusion_common::config::ConfigOptions; +use datafusion_common::error::Result; +use datafusion_common::exec_datafusion_err; +use datafusion_expr::{ColumnarValue, Volatility}; +use sedona_expr::scalar_udf::{SedonaScalarKernel, SedonaScalarUDF}; +use sedona_gdal::gdal::Gdal; +use sedona_gdal::gdal_dyn_bindgen::{GDAL_OF_RASTER, GDAL_OF_READONLY}; +use sedona_gdal::raster::types::DatasetOptions; +use sedona_gdal::spatial_ref::SpatialRef; +use sedona_raster::builder::RasterBuilder; +use sedona_raster::traits::{BandMetadata, RasterMetadata}; +use sedona_schema::datatypes::{SedonaType, RASTER}; +use sedona_schema::matchers::ArgMatcher; +use sedona_schema::raster::StorageType; + +use crate::gdal_common::{ + gdal_to_band_data_type, nodata_f64_to_bytes, normalize_outdb_source_path, with_gdal, +}; +use crate::gdal_dataset_provider::configure_thread_local_options; + +pub fn rs_from_path_udf() -> SedonaScalarUDF { + SedonaScalarUDF::new( + "rs_frompath", + vec![ + Arc::new(RsFromPath::new(false)), + Arc::new(RsFromPath::new(true)), + ], + Volatility::Volatile, + ) +} + +#[derive(Debug)] +pub(crate) struct RsFromPath { + with_params: bool, +} + +impl RsFromPath { + pub(crate) fn new(with_params: bool) -> Self { + Self { with_params } + } + + #[allow(dead_code)] + fn parse_params(params: &str) -> HashMap { + params + .split(';') + .filter_map(|pair| { + let parts: Vec<&str> = pair.trim().splitn(2, '=').collect(); + if parts.len() == 2 { + Some((parts[0].trim().to_string(), parts[1].trim().to_string())) + } else { + None + } + }) + .collect() + } + + fn load_outdb_raster(gdal: &Gdal, path: &str, _params: Option<&str>) -> Result { + let gdal_path = normalize_outdb_source_path(path); + let dataset = gdal + .open_ex_with_options( + &gdal_path, + DatasetOptions { + open_flags: GDAL_OF_RASTER | GDAL_OF_READONLY, + ..Default::default() + }, + ) + .map_err(|e| { + exec_datafusion_err!( + "Failed to open raster file '{}'(GDAL path '{}'): {}", + path, + gdal_path, + e + ) + })?; + + let (width, height) = dataset.raster_size(); + let geotransform = dataset + .geo_transform() + .map_err(|e| exec_datafusion_err!("Failed to get geotransform: {}", e))?; + + let metadata = RasterMetadata { + width: width as u64, + height: height as u64, + upperleft_x: geotransform[0], + upperleft_y: geotransform[3], + scale_x: geotransform[1], + scale_y: geotransform[5], + skew_x: geotransform[2], + skew_y: geotransform[4], + }; + + let crs = dataset + .spatial_ref() + .ok() + .and_then(|sr: SpatialRef| sr.to_projjson().ok()); + + let mut builder = RasterBuilder::new(1); + builder + .start_raster(&metadata, crs.as_deref()) + .map_err(|e| exec_datafusion_err!("Failed to start raster: {}", e))?; + + let band_count = dataset.raster_count(); + for band_idx in 1..=band_count { + let band = dataset + .rasterband(band_idx) + .map_err(|e| exec_datafusion_err!("Failed to get band {}: {}", band_idx, e))?; + + let gdal_type = band.band_type(); + let band_data_type = gdal_to_band_data_type(gdal_type) + .map_err(|_| exec_datafusion_err!("Unsupported band data type: {:?}", gdal_type))?; + + let nodata_bytes = band + .no_data_value() + .map(|no_data| nodata_f64_to_bytes(no_data, &band_data_type)); + + let band_metadata = BandMetadata { + nodata_value: nodata_bytes, + storage_type: StorageType::OutDbRef, + datatype: band_data_type, + outdb_url: Some(path.to_string()), + outdb_band_id: Some(band_idx as u32), + }; + + builder + .start_band(band_metadata) + .map_err(|e| exec_datafusion_err!("Failed to start band: {}", e))?; + + builder.band_data_writer().append_value([]); + + builder + .finish_band() + .map_err(|e| exec_datafusion_err!("Failed to finish band: {}", e))?; + } + + builder + .finish_raster() + .map_err(|e| exec_datafusion_err!("Failed to finish raster: {}", e))?; + + builder + .finish() + .map_err(|e| exec_datafusion_err!("Failed to build raster: {}", e)) + } +} + +impl SedonaScalarKernel for RsFromPath { + fn return_type(&self, args: &[SedonaType]) -> Result> { + let matchers = if self.with_params { + vec![ArgMatcher::is_string(), ArgMatcher::is_string()] + } else { + vec![ArgMatcher::is_string()] + }; + + let matcher = ArgMatcher::new(matchers, RASTER); + matcher.match_args(args) + } + + fn invoke_batch( + &self, + arg_types: &[SedonaType], + args: &[ColumnarValue], + ) -> Result { + self.invoke_batch_from_args(arg_types, args, &SedonaType::Arrow(DataType::Null), 0, None) + } + + fn invoke_batch_from_args( + &self, + _arg_types: &[SedonaType], + args: &[ColumnarValue], + _return_type: &SedonaType, + _num_rows: usize, + config_options: Option<&ConfigOptions>, + ) -> Result { + with_gdal(|gdal| { + configure_thread_local_options(gdal, config_options)?; + + let (paths, params_opt) = match &args[0] { + ColumnarValue::Scalar(scalar) => { + let path = scalar.to_array().map_err(|e| { + exec_datafusion_err!("Failed to convert scalar to array: {}", e) + })?; + let params = if self.with_params { + match &args[1] { + ColumnarValue::Scalar(s) => Some(s.to_array().map_err(|e| { + exec_datafusion_err!("Failed to convert params scalar: {}", e) + })?), + ColumnarValue::Array(a) => Some(a.clone()), + } + } else { + None + }; + (path, params) + } + ColumnarValue::Array(array) => { + let params = if self.with_params { + match &args[1] { + ColumnarValue::Scalar(s) => Some(s.to_array().map_err(|e| { + exec_datafusion_err!("Failed to convert params scalar: {}", e) + })?), + ColumnarValue::Array(a) => Some(a.clone()), + } + } else { + None + }; + (array.clone(), params) + } + }; + + let paths = cast(&paths, &DataType::Utf8)?; + let path_array = as_string_array(&paths)?; + + let params_casted = params_opt.map(|p| cast(&p, &DataType::Utf8)).transpose()?; + let params_array = params_casted + .as_ref() + .map(|p| as_string_array(p.as_ref())) + .transpose()?; + + let len = path_array.len(); + if len == 0 { + let builder = RasterBuilder::new(0); + let result = builder + .finish() + .map_err(|e| exec_datafusion_err!("Failed to build empty raster: {}", e))?; + return Ok(ColumnarValue::Array(Arc::new(result))); + } + + let mut combined_arrays: Vec = Vec::with_capacity(len); + for i in 0..len { + if path_array.is_null(i) { + let mut builder = RasterBuilder::new(1); + builder + .append_null() + .map_err(|e| exec_datafusion_err!("Failed to append null: {}", e))?; + let result = builder + .finish() + .map_err(|e| exec_datafusion_err!("Failed to build null raster: {}", e))?; + combined_arrays.push(Arc::new(result)); + } else { + let path = path_array.value(i); + let params = params_array.and_then(|pa| { + if pa.is_null(i) { + None + } else { + Some(pa.value(i)) + } + }); + + let raster = Self::load_outdb_raster(gdal, path, params)?; + combined_arrays.push(Arc::new(raster)); + } + } + + let refs: Vec<&dyn Array> = combined_arrays.iter().map(|a| a.as_ref()).collect(); + let result = arrow::compute::concat(&refs) + .map_err(|e| exec_datafusion_err!("Failed to concatenate rasters: {}", e))?; + + match &args[0] { + ColumnarValue::Scalar(_) => { + let scalar = datafusion_common::ScalarValue::try_from_array(&result, 0)?; + Ok(ColumnarValue::Scalar(scalar)) + } + ColumnarValue::Array(_) => Ok(ColumnarValue::Array(result)), + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_params() { + let params = "key1=value1;key2=value2"; + let parsed = RsFromPath::parse_params(params); + assert_eq!(parsed.get("key1"), Some(&"value1".to_string())); + assert_eq!(parsed.get("key2"), Some(&"value2".to_string())); + + let parsed = RsFromPath::parse_params(""); + assert!(parsed.is_empty()); + + let parsed = RsFromPath::parse_params("option=true"); + assert_eq!(parsed.get("option"), Some(&"true".to_string())); + } + + #[test] + fn udf_from_path() { + let udf: datafusion_expr::ScalarUDF = rs_from_path_udf().into(); + assert_eq!(udf.name(), "rs_frompath"); + } +} diff --git a/rust/sedona-raster-gdal/src/rs_metadata.rs b/rust/sedona-raster-gdal/src/rs_metadata.rs new file mode 100644 index 000000000..4bb1eddee --- /dev/null +++ b/rust/sedona-raster-gdal/src/rs_metadata.rs @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{sync::Arc, vec}; + +use arrow_array::builder::{Float64Builder, Int32Builder, UInt64Builder}; +use arrow_array::StructArray; +use arrow_schema::{DataType, Field, Fields}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::error::Result; +use datafusion_common::exec_datafusion_err; +use datafusion_expr::{ColumnarValue, Volatility}; +use sedona_expr::scalar_udf::{SedonaScalarKernel, SedonaScalarUDF}; +use sedona_raster::traits::RasterRef; +use sedona_schema::crs::deserialize_crs; +use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher}; + +use crate::gdal_common::with_gdal; +use crate::gdal_dataset_provider::{configure_thread_local_options, thread_local_provider}; + +pub fn rs_metadata_udf() -> SedonaScalarUDF { + SedonaScalarUDF::new( + "rs_metadata", + vec![Arc::new(RsMetaData {})], + Volatility::Immutable, + ) +} + +fn metadata_struct_fields() -> Fields { + Fields::from(vec![ + Field::new("upperLeftX", DataType::Float64, true), + Field::new("upperLeftY", DataType::Float64, true), + Field::new("gridWidth", DataType::UInt64, true), + Field::new("gridHeight", DataType::UInt64, true), + Field::new("scaleX", DataType::Float64, true), + Field::new("scaleY", DataType::Float64, true), + Field::new("skewX", DataType::Float64, true), + Field::new("skewY", DataType::Float64, true), + Field::new("srid", DataType::Int32, true), + Field::new("numSampleDimensions", DataType::UInt64, true), + Field::new("tileWidth", DataType::UInt64, true), + Field::new("tileHeight", DataType::UInt64, true), + ]) +} + +#[derive(Debug)] +struct RsMetaData {} + +impl SedonaScalarKernel for RsMetaData { + fn return_type(&self, args: &[SedonaType]) -> Result> { + let matcher = ArgMatcher::new( + vec![ArgMatcher::is_raster()], + SedonaType::Arrow(DataType::Struct(metadata_struct_fields())), + ); + + matcher.match_args(args) + } + + fn invoke_batch( + &self, + arg_types: &[SedonaType], + args: &[ColumnarValue], + ) -> Result { + self.invoke_batch_from_args(arg_types, args, &SedonaType::Arrow(DataType::Null), 0, None) + } + + fn invoke_batch_from_args( + &self, + arg_types: &[SedonaType], + args: &[ColumnarValue], + _return_type: &SedonaType, + _num_rows: usize, + config_options: Option<&ConfigOptions>, + ) -> Result { + let executor = sedona_raster_functions::RasterExecutor::new(arg_types, args); + let capacity = executor.num_iterations(); + + let mut upper_left_x_builder = Float64Builder::with_capacity(capacity); + let mut upper_left_y_builder = Float64Builder::with_capacity(capacity); + let mut grid_width_builder = UInt64Builder::with_capacity(capacity); + let mut grid_height_builder = UInt64Builder::with_capacity(capacity); + let mut scale_x_builder = Float64Builder::with_capacity(capacity); + let mut scale_y_builder = Float64Builder::with_capacity(capacity); + let mut skew_x_builder = Float64Builder::with_capacity(capacity); + let mut skew_y_builder = Float64Builder::with_capacity(capacity); + let mut srid_builder = Int32Builder::with_capacity(capacity); + let mut num_bands_builder = UInt64Builder::with_capacity(capacity); + let mut tile_width_builder = UInt64Builder::with_capacity(capacity); + let mut tile_height_builder = UInt64Builder::with_capacity(capacity); + + with_gdal(|gdal| { + configure_thread_local_options(gdal, config_options)?; + let provider = thread_local_provider(gdal) + .map_err(|e| exec_datafusion_err!("Failed to init GDAL provider: {e}"))?; + + executor.execute_raster_void(|_i, raster_opt| { + match raster_opt { + None => { + upper_left_x_builder.append_null(); + upper_left_y_builder.append_null(); + grid_width_builder.append_null(); + grid_height_builder.append_null(); + scale_x_builder.append_null(); + scale_y_builder.append_null(); + skew_x_builder.append_null(); + skew_y_builder.append_null(); + srid_builder.append_null(); + num_bands_builder.append_null(); + tile_width_builder.append_null(); + tile_height_builder.append_null(); + } + Some(raster) => { + let metadata = raster.metadata(); + + upper_left_x_builder.append_value(metadata.upper_left_x()); + upper_left_y_builder.append_value(metadata.upper_left_y()); + grid_width_builder.append_value(metadata.width()); + grid_height_builder.append_value(metadata.height()); + scale_x_builder.append_value(metadata.scale_x()); + scale_y_builder.append_value(metadata.scale_y()); + skew_x_builder.append_value(metadata.skew_x()); + skew_y_builder.append_value(metadata.skew_y()); + + let srid = match raster.crs() { + None => 0i32, + Some(crs_str) => match deserialize_crs(crs_str) { + Ok(Some(crs_ref)) => { + crs_ref.srid().ok().flatten().map(|s| s as i32).unwrap_or(0) + } + _ => 0i32, + }, + }; + srid_builder.append_value(srid); + + num_bands_builder.append_value(raster.bands().len() as u64); + + let dataset = provider.raster_ref_to_gdal(raster).map_err(|e| { + exec_datafusion_err!("Failed to create GDAL dataset: {e}") + })?; + let band1 = dataset + .as_dataset() + .rasterband(1) + .map_err(|e| exec_datafusion_err!("Failed to get band 1: {e}"))?; + let (block_x, block_y) = band1.block_size(); + tile_width_builder.append_value(block_x.max(1) as u64); + tile_height_builder.append_value(block_y.max(1) as u64); + } + } + Ok(()) + }) + })?; + + let struct_array = StructArray::new( + metadata_struct_fields(), + vec![ + Arc::new(upper_left_x_builder.finish()), + Arc::new(upper_left_y_builder.finish()), + Arc::new(grid_width_builder.finish()), + Arc::new(grid_height_builder.finish()), + Arc::new(scale_x_builder.finish()), + Arc::new(scale_y_builder.finish()), + Arc::new(skew_x_builder.finish()), + Arc::new(skew_y_builder.finish()), + Arc::new(srid_builder.finish()), + Arc::new(num_bands_builder.finish()), + Arc::new(tile_width_builder.finish()), + Arc::new(tile_height_builder.finish()), + ], + None, + ); + + executor.finish(Arc::new(struct_array)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::cast::AsArray; + use datafusion_expr::ScalarUDF; + use sedona_raster::array::RasterStructArray; + use sedona_raster::builder::RasterBuilder; + use sedona_schema::datatypes::RASTER; + use sedona_testing::testers::ScalarUdfTester; + use tempfile::TempDir; + + fn write_test_geotiff() -> (StructArray, usize, usize) { + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("rs_metadata_test.tif"); + let path_str = path.to_str().unwrap().to_string(); + + with_gdal(|gdal| { + let driver = gdal.get_driver_by_name("GTiff").unwrap(); + let dataset = driver + .create_with_band_type::(&path_str, 10, 7, 1) + .unwrap(); + dataset + .set_geo_transform(&[100.0, 2.0, 0.0, 200.0, 0.0, -2.0]) + .unwrap(); + dataset.set_projection("EPSG:4326").unwrap(); + + let band = dataset.rasterband(1).unwrap(); + band.set_no_data_value(Some(255.0)).unwrap(); + + let raster_array = crate::utils::dataset_to_indb_raster(&dataset)?; + let raster_struct = RasterStructArray::new(&raster_array); + let raster = raster_struct.get(0).unwrap(); + let provider = thread_local_provider(gdal).unwrap(); + let provider_dataset = provider.raster_ref_to_gdal(&raster).unwrap(); + let provider_band = provider_dataset.as_dataset().rasterband(1).unwrap(); + let (block_x, block_y) = provider_band.block_size(); + + Ok::<_, datafusion_common::DataFusionError>((raster_array, block_x, block_y)) + }) + .unwrap() + } + + #[test] + fn rs_metadata_udf_docs() { + let udf: ScalarUDF = rs_metadata_udf().into(); + assert_eq!(udf.name(), "rs_metadata"); + } + + #[test] + fn rs_metadata_tile_dimensions_from_gdal() { + let (raster_array, block_x, block_y) = write_test_geotiff(); + + let udf: ScalarUDF = rs_metadata_udf().into(); + let tester = ScalarUdfTester::new(udf, vec![RASTER]); + let result = tester.invoke_array(Arc::new(raster_array)).unwrap(); + let struct_array = result.as_struct(); + + let tile_width = struct_array + .column(10) + .as_primitive::() + .value(0); + let tile_height = struct_array + .column(11) + .as_primitive::() + .value(0); + + assert_eq!(tile_width, block_x.max(1) as u64); + assert_eq!(tile_height, block_y.max(1) as u64); + } + + #[test] + fn rs_metadata_null_input_propagates_nulls() { + let mut builder = RasterBuilder::new(1); + builder.append_null().unwrap(); + let raster_array = builder.finish().unwrap(); + + let udf: ScalarUDF = rs_metadata_udf().into(); + let tester = ScalarUdfTester::new(udf, vec![RASTER]); + let result = tester.invoke_array(Arc::new(raster_array)).unwrap(); + let struct_array = result.as_struct(); + + for column in struct_array.columns() { + assert!(column.is_null(0)); + } + } +} diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs index 7cd945911..ec4912eef 100644 --- a/rust/sedona/src/context.rs +++ b/rust/sedona/src/context.rs @@ -221,6 +221,10 @@ impl SedonaContext { Arc::new(RandomGeometryFunction::default()), ); + for udf in sedona_raster_gdal::all_gdal_udfs() { + out.ctx.register_udf(udf.into()); + } + // Always register default function set out.register_function_set(sedona_functions::register::default_function_set());