Skip to content
Draft
63 changes: 54 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,22 @@ lazy val sparkV1 = (project in file("spark"))
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided",
// For DynamoDBCommitStore
"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided",

"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided"
) ++ {
if (unityCatalogVersion >= "0.5.0") {
Seq(
"io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll(
ExclusionRule(organization = "org.openapitools"),
ExclusionRule(organization = "com.fasterxml.jackson.core"),
ExclusionRule(organization = "com.fasterxml.jackson.module"),
ExclusionRule(organization = "com.fasterxml.jackson.datatype"),
ExclusionRule(organization = "com.fasterxml.jackson.dataformat")
)
)
} else {
Seq.empty
}
} ++ Seq(
// Test deps
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test",
Expand Down Expand Up @@ -660,16 +674,40 @@ lazy val spark = (project in file("spark-unified"))
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided",
"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided",

"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided"
) ++ {
if (unityCatalogVersion >= "0.5.0") {
Seq(
"io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll(
ExclusionRule(organization = "org.openapitools"),
ExclusionRule(organization = "com.fasterxml.jackson.core"),
ExclusionRule(organization = "com.fasterxml.jackson.module"),
ExclusionRule(organization = "com.fasterxml.jackson.datatype"),
ExclusionRule(organization = "com.fasterxml.jackson.dataformat")
)
)
} else {
Seq.empty
}
} ++ Seq(
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test",
"junit" % "junit" % "4.13.2" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests"
) ++ {
if (unityCatalogVersion >= "0.5.0") {
Seq(
// unitycatalog-hadoop references the ABFS token-provider interface during classloading.
"org.apache.hadoop" % "hadoop-azure" % hadoopVersion % "test"
)
} else {
Seq.empty
}
} ++ Seq(
"org.mockito" % "mockito-inline" % "4.11.0" % "test",
),

Expand Down Expand Up @@ -843,13 +881,19 @@ Global / ensurePinnedUnityCatalog := {
val home = file(sys.props("user.home"))
// Check both layouts: a restored sbt cache can pre-populate ivy alone, leaving m2 empty -
// checking only ivy would silently skip the slow publish and break mvn-based consumers.
val ivy2Canary = home / ".ivy2" / "local" / "io.unitycatalog" /
val ivy2ClientCanary = home / ".ivy2" / "local" / "io.unitycatalog" /
"unitycatalog-client" / unityCatalogVersion / "ivys" / "ivy.xml"
val m2Canary = home / ".m2" / "repository" / "io" / "unitycatalog" /
val m2ClientCanary = home / ".m2" / "repository" / "io" / "unitycatalog" /
"unitycatalog-client" / unityCatalogVersion /
s"unitycatalog-client-$unityCatalogVersion.pom"
if (!ivy2Canary.exists || !m2Canary.exists) {
publishPinnedUnityCatalog(log, ivy2Canary)
val ivy2HadoopCanary = home / ".ivy2" / "local" / "io.unitycatalog" /
"unitycatalog-hadoop" / unityCatalogVersion / "ivys" / "ivy.xml"
val m2HadoopCanary = home / ".m2" / "repository" / "io" / "unitycatalog" /
"unitycatalog-hadoop" / unityCatalogVersion /
s"unitycatalog-hadoop-$unityCatalogVersion.pom"
if (!Seq(ivy2ClientCanary, m2ClientCanary, ivy2HadoopCanary, m2HadoopCanary)
.forall(_.exists)) {
publishPinnedUnityCatalog(log, ivy2ClientCanary)
}
}
}
Expand Down Expand Up @@ -1197,6 +1241,7 @@ lazy val storage = (project in file("storage"))
commonSettings,
exportJars := true,
javaOnlyReleaseSettings,

libraryDependencies ++= Seq(
// User can provide any 2.x or 3.x version. We don't use any new fancy APIs. Watch out for
// versions with known vulnerabilities.
Expand Down
24 changes: 13 additions & 11 deletions project/scripts/setup_unitycatalog_main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ set -euo pipefail
# The pin. Bump both lines together if UC's version.sbt changed at the new SHA. build.sbt's
# `unityCatalogVersion` is obtained by running this script with `--print-version`, so these two
# values are the single source of truth.
UC_PIN_SHA=e6deb37e890a0a6fb8ae495b5bec52326731f6a6
UC_PIN_SHA=9844a3002d7fdf41e8ad65ff3c07117fc2a9eba0
UC_BASE_VERSION=0.5.0-SNAPSHOT
# ---------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -89,7 +89,7 @@ if [[ "${1:-}" == "--print-version" ]]; then
exit 0
fi

# Canonical Ivy + Maven artifact paths. Delta depends on all three UC modules; sbt resolves from
# Canonical Ivy + Maven artifact paths. Delta depends on these UC modules; sbt resolves from
# ~/.ivy2/local, mvn (kernel-examples integration tests) resolves from ~/.m2/repository. If any
# is missing in either layout we must re-publish.
IVY_LOCAL="$HOME/.ivy2/local/io.unitycatalog"
Expand Down Expand Up @@ -118,24 +118,26 @@ if [[ "$UC_FORCE" != "1" ]] && all_canaries_present; then
exit 0
fi

echo ">>> Fetching Unity Catalog main from $UC_REPO"
echo ">>> Fetching Unity Catalog from $UC_REPO"
rm -rf "$UC_DIR"
mkdir -p "$UC_DIR"
# Fetch main's full history so we can run `git merge-base --is-ancestor` below to verify the
# pinned SHA is actually on main. UC's repo is small; full fetch of one branch is cheap.
# Fetch the target branch so we can verify the pinned SHA is reachable.
git -C "$UC_DIR" init --quiet
git -C "$UC_DIR" remote add origin "$UC_REPO"
git -C "$UC_DIR" fetch --quiet origin main
if [[ "$UC_REF" == "main" ]]; then
git -C "$UC_DIR" fetch --quiet origin main
else
git -C "$UC_DIR" fetch --quiet origin "$UC_PIN_SHA"
fi

cd "$UC_DIR"

# Safety check: the pinned SHA must be reachable from UC main. Local `merge-base --is-ancestor`
# Safety check: the pinned SHA must be reachable from the fetched branch. Local `merge-base --is-ancestor`
# on the history we just fetched - no GitHub API, no token needed. Only applies when UC_REF is
# the pinned SHA; UC_REF=main is trivially on main.
if [[ "$UC_REF" == "$UC_PIN_SHA" ]]; then
if ! git merge-base --is-ancestor "$UC_PIN_SHA" origin/main 2>/dev/null; then
echo "ERROR: UC_PIN_SHA=$UC_PIN_SHA is not reachable from unitycatalog/unitycatalog main." >&2
echo " Pin must reference a commit on https://github.com/unitycatalog/unitycatalog/commits/main" >&2
if ! git rev-parse --verify "$UC_PIN_SHA^{commit}" >/dev/null 2>&1; then
echo "ERROR: UC_PIN_SHA=$UC_PIN_SHA could not be fetched from $UC_REPO." >&2
exit 1
fi
fi
Expand All @@ -161,7 +163,7 @@ fi
# coordinate. Applied as a persistent setting so it sticks across the two sbt invocations below.
SET_VERSION_CMD="set ThisBuild / version := \"$UC_VERSION\""

echo ">>> Building and publishing UC client + server to local Maven repo"
echo ">>> Building and publishing UC client + server + hadoop to local Maven repo"
./build/sbt \
"$SET_VERSION_CMD" \
"set client / Compile / packageDoc / publishArtifact := false" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension

val spark = SparkSession.active

private lazy val deltaCatalogClient: Option[DeltaCatalogClient] =
Some(UCDeltaCatalogClient(delegate, spark))

private lazy val isUnityCatalog: Boolean = {
val delegateField = classOf[DelegatingCatalogExtension].getDeclaredField("delegate")
delegateField.setAccessible(true)
Expand Down Expand Up @@ -178,7 +181,6 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
base
}
}
var locUriOpt = location.map(CatalogUtils.stringToURI)
val existingTableOpt = getExistingTableIfExists(id, Some(ident), operation)
// PROP_IS_MANAGED_LOCATION indicates that the table location is not user-specified but
// system-generated. The table should be created as managed table in this case.
Expand All @@ -193,10 +195,24 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
} else {
CatalogTableType.EXTERNAL
}
// operation.isCreate covers CREATE and CREATE OR REPLACE when no existing table was found.
val ucDeltaApiCreate = if (isUnityCatalog && existingTableOpt.isEmpty && operation.isCreate) {
deltaCatalogClient.flatMap(_.prepareCreateTable(
ident,
tableType,
location.map(CatalogUtils.stringToURI)))
} else {
None
}
val locUriOpt = ucDeltaApiCreate.map(_.location).orElse(location.map(CatalogUtils.stringToURI))
val tablePropertiesWithUCDeltaApi =
tableProperties ++ ucDeltaApiCreate.map(_.tableProperties).getOrElse(Map.empty)
val writeOptionsWithUCDeltaApi =
writeOptions ++ ucDeltaApiCreate.map(_.storageProperties).getOrElse(Map.empty)
val loc = locUriOpt
.orElse(existingTableOpt.flatMap(_.storage.locationUri))
.getOrElse(spark.sessionState.catalog.defaultTablePath(id))
val storage = DataSource.buildStorageFormatFromOptions(writeOptions)
val storage = DataSource.buildStorageFormatFromOptions(writeOptionsWithUCDeltaApi)
.copy(locationUri = Option(loc))
val commentOpt = Option(allTableProperties.get("comment"))

Expand All @@ -209,7 +225,7 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
provider = Some(DeltaSourceUtils.ALT_NAME),
partitionColumnNames = newPartitionColumns,
bucketSpec = newBucketSpec,
properties = tableProperties,
properties = tablePropertiesWithUCDeltaApi,
comment = commentOpt
)

Expand All @@ -223,7 +239,7 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
val writer = sourceQuery.map { df =>
val catalogTbl = Some(tableDesc)
// For safety, only extract the file system options here, to create deltaLog.
val fileSystemOptions = writeOptions.filter { case (k, _) =>
val fileSystemOptions = writeOptionsWithUCDeltaApi.filter { case (k, _) =>
DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith)
}
val deltaOptions = new DeltaOptions(
Expand Down Expand Up @@ -277,9 +293,14 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
// Before this bug is fixed, we should only call the catalog plugin API to create tables
// if UC is enabled to replace `V2SessionCatalog`.
createTableFunc = Option.when(isUnityCatalog) {
v1Table => {
val t = V1Table(v1Table)
super.createTable(ident, t.columns(), t.partitioning, t.properties)
(v1Table, snapshot) => {
ucDeltaApiCreate match {
case Some(_) =>
deltaCatalogClient.foreach(_.createTable(ident, v1Table, snapshot))
case None =>
val t = V1Table(v1Table)
super.createTable(ident, t.columns(), t.partitioning, t.properties)
}
}
}).run(spark)

Expand All @@ -290,7 +311,14 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
"DeltaCatalog", "loadTable") {
setVariantBlockingConfigIfUC()
try {
val table = super.loadTable(ident)
val table =
if (isPathIdentifier(ident)) {
loadPathTable(ident)
} else if (isIcebergPathIdentifier(ident)) {
newIcebergPathTable(ident)
} else {
deltaCatalogClient.flatMap(_.loadTable(ident)).getOrElse(super.loadTable(ident))
}

ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt =>
return sspt
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog

import java.net.URI

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.connector.catalog.{Identifier, Table}
import org.apache.spark.sql.delta.Snapshot

/**
* Values returned by the UC Delta Rest Catalog API prepare-create step.
*
* @param location UC-chosen location where Delta should write the initial log.
* @param tableProperties properties added to the CatalogTable so the Delta commit uses the
* server-required protocol/features and UC table id.
* @param storageProperties Hadoop storage options, usually UC-vended credentials, added to the
* write options for the initial Delta commit.
*/
private[catalog] case class PreparedUCDeltaRestCatalogApiCreate(
location: URI,
tableProperties: Map[String, String],
storageProperties: Map[String, String])

/**
* Spark-facing Delta catalog API hook.
*
* <p>The interface is intentionally free of UC SDK and Hadoop credential dependencies so the shared
* catalog path does not depend on a specific UC client implementation.
*/
private[catalog] trait DeltaCatalogClient {
def loadTable(ident: Identifier): Option[Table]

def prepareCreateTable(
ident: Identifier,
tableType: CatalogTableType,
location: Option[URI]): Option[PreparedUCDeltaRestCatalogApiCreate]

def createTable(
ident: Identifier,
table: CatalogTable,
snapshot: Snapshot): Unit
}

private[delta] object DeltaCatalogClient {
private[catalog] val UCDeltaRestCatalogApiEnabledKey =
UCDeltaCatalogClient.UCDeltaRestCatalogApiEnabledKey
private[catalog] val RenewCredentialEnabledKey =
UCDeltaCatalogClient.RenewCredentialEnabledKey
private[catalog] val CredScopedFsEnabledKey =
UCDeltaCatalogClient.CredScopedFsEnabledKey

private[catalog] def deltaRestApiEnabledConf(catalogName: String): String = {
UCDeltaCatalogClient.deltaRestApiEnabledConf(catalogName)
}

private[catalog] def renewCredentialEnabledConf(catalogName: String): String = {
UCDeltaCatalogClient.renewCredentialEnabledConf(catalogName)
}

private[catalog] def credScopedFsEnabledConf(catalogName: String): String = {
UCDeltaCatalogClient.credScopedFsEnabledConf(catalogName)
}

private[delta] def pathCredentialOptions(
spark: SparkSession,
path: Path): Map[String, String] = {
UCDeltaCatalogClient.pathCredentialOptions(spark, path)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,19 @@ class DeltaTableV2 private(
PathInfo(new Path(catalogTable.get.location), Seq.empty, None)
} else {
val (rootPath, filters, timeTravel) =
DeltaDataSource.parsePathIdentifier(spark, path.toString, options)
DeltaDataSource.parsePathIdentifier(spark, path.toString, pathBasedOptions)
PathInfo(rootPath, filters, timeTravel)
}
}

private lazy val pathBasedOptions: Map[String, String] = {
if (catalogTable.isDefined) {
options
} else {
DeltaCatalogClient.pathCredentialOptions(spark, path) ++ options
}
}

private def rootPath = pathInfo.rootPath

private def partitionFilters = pathInfo.partitionFilters
Expand Down Expand Up @@ -122,7 +130,7 @@ class DeltaTableV2 private(
}
fileSystemOptions ++ options
} else {
options
pathBasedOptions
}
DeltaLog.forTable(
spark,
Expand Down
Loading
Loading