Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 54 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,22 @@ lazy val sparkV1 = (project in file("spark"))
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided",
// For DynamoDBCommitStore
"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided",

"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided"
) ++ {
if (unityCatalogVersion >= "0.5.0") {
Seq(
"io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll(
ExclusionRule(organization = "org.openapitools"),
ExclusionRule(organization = "com.fasterxml.jackson.core"),
ExclusionRule(organization = "com.fasterxml.jackson.module"),
ExclusionRule(organization = "com.fasterxml.jackson.datatype"),
ExclusionRule(organization = "com.fasterxml.jackson.dataformat")
)
)
} else {
Seq.empty
}
} ++ Seq(
// Test deps
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test",
Expand Down Expand Up @@ -660,16 +674,40 @@ lazy val spark = (project in file("spark-unified"))
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided",
"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided",

"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided"
) ++ {
if (unityCatalogVersion >= "0.5.0") {
Seq(
"io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll(
ExclusionRule(organization = "org.openapitools"),
ExclusionRule(organization = "com.fasterxml.jackson.core"),
ExclusionRule(organization = "com.fasterxml.jackson.module"),
ExclusionRule(organization = "com.fasterxml.jackson.datatype"),
ExclusionRule(organization = "com.fasterxml.jackson.dataformat")
)
)
} else {
Seq.empty
}
} ++ Seq(
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test",
"junit" % "junit" % "4.13.2" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests"
) ++ {
if (unityCatalogVersion >= "0.5.0") {
Seq(
// unitycatalog-hadoop references the ABFS token-provider interface during classloading.
"org.apache.hadoop" % "hadoop-azure" % hadoopVersion % "test"
)
} else {
Seq.empty
}
} ++ Seq(
"org.mockito" % "mockito-inline" % "4.11.0" % "test",
),

Expand Down Expand Up @@ -843,13 +881,19 @@ Global / ensurePinnedUnityCatalog := {
val home = file(sys.props("user.home"))
// Check both layouts: a restored sbt cache can pre-populate ivy alone, leaving m2 empty -
// checking only ivy would silently skip the slow publish and break mvn-based consumers.
val ivy2Canary = home / ".ivy2" / "local" / "io.unitycatalog" /
val ivy2ClientCanary = home / ".ivy2" / "local" / "io.unitycatalog" /
"unitycatalog-client" / unityCatalogVersion / "ivys" / "ivy.xml"
val m2Canary = home / ".m2" / "repository" / "io" / "unitycatalog" /
val m2ClientCanary = home / ".m2" / "repository" / "io" / "unitycatalog" /
"unitycatalog-client" / unityCatalogVersion /
s"unitycatalog-client-$unityCatalogVersion.pom"
if (!ivy2Canary.exists || !m2Canary.exists) {
publishPinnedUnityCatalog(log, ivy2Canary)
val ivy2HadoopCanary = home / ".ivy2" / "local" / "io.unitycatalog" /
"unitycatalog-hadoop" / unityCatalogVersion / "ivys" / "ivy.xml"
val m2HadoopCanary = home / ".m2" / "repository" / "io" / "unitycatalog" /
"unitycatalog-hadoop" / unityCatalogVersion /
s"unitycatalog-hadoop-$unityCatalogVersion.pom"
if (!Seq(ivy2ClientCanary, m2ClientCanary, ivy2HadoopCanary, m2HadoopCanary)
.forall(_.exists)) {
publishPinnedUnityCatalog(log, ivy2ClientCanary)
}
}
}
Expand Down Expand Up @@ -1197,6 +1241,7 @@ lazy val storage = (project in file("storage"))
commonSettings,
exportJars := true,
javaOnlyReleaseSettings,

libraryDependencies ++= Seq(
// User can provide any 2.x or 3.x version. We don't use any new fancy APIs. Watch out for
// versions with known vulnerabilities.
Expand Down
24 changes: 13 additions & 11 deletions project/scripts/setup_unitycatalog_main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ set -euo pipefail
# The pin. Bump both lines together if UC's version.sbt changed at the new SHA. build.sbt's
# `unityCatalogVersion` is obtained by running this script with `--print-version`, so these two
# values are the single source of truth.
UC_PIN_SHA=e6cd5cd6acdefaa4f7d9bd3814075176aac43132
UC_PIN_SHA=0549acd25e4f149cd524322b4c6e2e10b1b187b1
UC_BASE_VERSION=0.5.0-SNAPSHOT
# ---------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -89,7 +89,7 @@ if [[ "${1:-}" == "--print-version" ]]; then
exit 0
fi

# Canonical Ivy + Maven artifact paths. Delta depends on all three UC modules; sbt resolves from
# Canonical Ivy + Maven artifact paths. Delta depends on these UC modules; sbt resolves from
# ~/.ivy2/local, mvn (kernel-examples integration tests) resolves from ~/.m2/repository. If any
# is missing in either layout we must re-publish.
IVY_LOCAL="$HOME/.ivy2/local/io.unitycatalog"
Expand Down Expand Up @@ -118,24 +118,26 @@ if [[ "$UC_FORCE" != "1" ]] && all_canaries_present; then
exit 0
fi

echo ">>> Fetching Unity Catalog main from $UC_REPO"
echo ">>> Fetching Unity Catalog from $UC_REPO"
rm -rf "$UC_DIR"
mkdir -p "$UC_DIR"
# Fetch main's full history so we can run `git merge-base --is-ancestor` below to verify the
# pinned SHA is actually on main. UC's repo is small; full fetch of one branch is cheap.
# Fetch the target branch so we can verify the pinned SHA is reachable.
git -C "$UC_DIR" init --quiet
git -C "$UC_DIR" remote add origin "$UC_REPO"
git -C "$UC_DIR" fetch --quiet origin main
if [[ "$UC_REF" == "main" ]]; then
git -C "$UC_DIR" fetch --quiet origin main
else
git -C "$UC_DIR" fetch --quiet origin "$UC_PIN_SHA"
fi

cd "$UC_DIR"

# Safety check: the pinned SHA must be reachable from UC main. Local `merge-base --is-ancestor`
# Safety check: the pinned SHA must be reachable from the fetched branch. Local `merge-base --is-ancestor`
# on the history we just fetched - no GitHub API, no token needed. Only applies when UC_REF is
# the pinned SHA; UC_REF=main is trivially on main.
if [[ "$UC_REF" == "$UC_PIN_SHA" ]]; then
if ! git merge-base --is-ancestor "$UC_PIN_SHA" origin/main 2>/dev/null; then
echo "ERROR: UC_PIN_SHA=$UC_PIN_SHA is not reachable from unitycatalog/unitycatalog main." >&2
echo " Pin must reference a commit on https://github.com/unitycatalog/unitycatalog/commits/main" >&2
if ! git rev-parse --verify "$UC_PIN_SHA^{commit}" >/dev/null 2>&1; then
echo "ERROR: UC_PIN_SHA=$UC_PIN_SHA could not be fetched from $UC_REPO." >&2
exit 1
fi
fi
Expand All @@ -161,7 +163,7 @@ fi
# coordinate. Applied as a persistent setting so it sticks across the two sbt invocations below.
SET_VERSION_CMD="set ThisBuild / version := \"$UC_VERSION\""

echo ">>> Building and publishing UC client + server to local Maven repo"
echo ">>> Building and publishing UC client + server + hadoop to local Maven repo"
./build/sbt \
"$SET_VERSION_CMD" \
"set client / Compile / packageDoc / publishArtifact := false" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension

val spark = SparkSession.active

private lazy val deltaCatalogClient: Option[DeltaCatalogClient] =
Some(UCDeltaCatalogClient(delegate, spark))

private lazy val isUnityCatalog: Boolean = {
val delegateField = classOf[DelegatingCatalogExtension].getDeclaredField("delegate")
delegateField.setAccessible(true)
Expand Down Expand Up @@ -178,7 +181,6 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
base
}
}
var locUriOpt = location.map(CatalogUtils.stringToURI)
val existingTableOpt = getExistingTableIfExists(id, Some(ident), operation)
// PROP_IS_MANAGED_LOCATION indicates that the table location is not user-specified but
// system-generated. The table should be created as managed table in this case.
Expand All @@ -193,10 +195,24 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
} else {
CatalogTableType.EXTERNAL
}
// operation.isCreate covers CREATE and CREATE OR REPLACE when no existing table was found.
val ucDeltaApiCreate = if (isUnityCatalog && existingTableOpt.isEmpty && operation.isCreate) {
deltaCatalogClient.flatMap(_.prepareCreateTable(
ident,
tableType,
location.map(CatalogUtils.stringToURI)))
} else {
None
}
val locUriOpt = ucDeltaApiCreate.map(_.location).orElse(location.map(CatalogUtils.stringToURI))
val tablePropertiesWithUCDeltaApi =
tableProperties ++ ucDeltaApiCreate.map(_.tableProperties).getOrElse(Map.empty)
val writeOptionsWithUCDeltaApi =
writeOptions ++ ucDeltaApiCreate.map(_.storageProperties).getOrElse(Map.empty)
val loc = locUriOpt
.orElse(existingTableOpt.flatMap(_.storage.locationUri))
.getOrElse(spark.sessionState.catalog.defaultTablePath(id))
val storage = DataSource.buildStorageFormatFromOptions(writeOptions)
val storage = DataSource.buildStorageFormatFromOptions(writeOptionsWithUCDeltaApi)
.copy(locationUri = Option(loc))
val commentOpt = Option(allTableProperties.get("comment"))

Expand All @@ -209,7 +225,7 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
provider = Some(DeltaSourceUtils.ALT_NAME),
partitionColumnNames = newPartitionColumns,
bucketSpec = newBucketSpec,
properties = tableProperties,
properties = tablePropertiesWithUCDeltaApi,
comment = commentOpt
)

Expand All @@ -223,7 +239,7 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
val writer = sourceQuery.map { df =>
val catalogTbl = Some(tableDesc)
// For safety, only extract the file system options here, to create deltaLog.
val fileSystemOptions = writeOptions.filter { case (k, _) =>
val fileSystemOptions = writeOptionsWithUCDeltaApi.filter { case (k, _) =>
DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith)
}
val deltaOptions = new DeltaOptions(
Expand Down Expand Up @@ -277,9 +293,14 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
// Before this bug is fixed, we should only call the catalog plugin API to create tables
// if UC is enabled to replace `V2SessionCatalog`.
createTableFunc = Option.when(isUnityCatalog) {
v1Table => {
val t = V1Table(v1Table)
super.createTable(ident, t.columns(), t.partitioning, t.properties)
(v1Table, snapshot) => {
ucDeltaApiCreate match {
case Some(_) =>
deltaCatalogClient.foreach(_.createTable(ident, v1Table, snapshot))
case None =>
val t = V1Table(v1Table)
super.createTable(ident, t.columns(), t.partitioning, t.properties)
}
}
}).run(spark)

Expand All @@ -290,7 +311,14 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
"DeltaCatalog", "loadTable") {
setVariantBlockingConfigIfUC()
try {
val table = super.loadTable(ident)
val table =
if (isPathIdentifier(ident)) {
loadPathTable(ident)
} else if (isIcebergPathIdentifier(ident)) {
newIcebergPathTable(ident)
} else {
deltaCatalogClient.flatMap(_.loadTable(ident)).getOrElse(super.loadTable(ident))
}

ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt =>
return sspt
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog

import java.net.URI

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.connector.catalog.{Identifier, Table}
import org.apache.spark.sql.delta.Snapshot

/**
* Values returned by the UC Delta Rest Catalog API prepare-create step.
*
* @param location UC-chosen location where Delta should write the initial log.
* @param tableProperties properties added to the CatalogTable so the Delta commit uses the
* server-required protocol/features and UC table id.
* @param storageProperties Hadoop storage options, usually UC-vended credentials, added to the
* write options for the initial Delta commit.
*/
private[catalog] case class PreparedUCDeltaRestCatalogApiCreate(
location: URI,
tableProperties: Map[String, String],
storageProperties: Map[String, String])

/**
* Spark-facing Delta catalog API hook.
*
* <p>The interface is intentionally free of UC SDK and Hadoop credential dependencies so the shared
* catalog path does not depend on a specific UC client implementation.
*/
private[catalog] trait DeltaCatalogClient {
def loadTable(ident: Identifier): Option[Table]

def prepareCreateTable(
ident: Identifier,
tableType: CatalogTableType,
location: Option[URI]): Option[PreparedUCDeltaRestCatalogApiCreate]

def createTable(
ident: Identifier,
table: CatalogTable,
snapshot: Snapshot): Unit
}

private[delta] object DeltaCatalogClient {
private[catalog] val UCDeltaRestCatalogApiEnabledKey =
UCDeltaCatalogClient.UCDeltaRestCatalogApiEnabledKey
private[catalog] val RenewCredentialEnabledKey =
UCDeltaCatalogClient.RenewCredentialEnabledKey
private[catalog] val CredScopedFsEnabledKey =
UCDeltaCatalogClient.CredScopedFsEnabledKey

private[catalog] def deltaRestApiEnabledConf(catalogName: String): String = {
UCDeltaCatalogClient.deltaRestApiEnabledConf(catalogName)
}

private[catalog] def renewCredentialEnabledConf(catalogName: String): String = {
UCDeltaCatalogClient.renewCredentialEnabledConf(catalogName)
}

private[catalog] def credScopedFsEnabledConf(catalogName: String): String = {
UCDeltaCatalogClient.credScopedFsEnabledConf(catalogName)
}

private[delta] def pathCredentialOptions(
spark: SparkSession,
path: Path): Map[String, String] = {
UCDeltaCatalogClient.pathCredentialOptions(spark, path)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,19 @@ class DeltaTableV2 private(
PathInfo(new Path(catalogTable.get.location), Seq.empty, None)
} else {
val (rootPath, filters, timeTravel) =
DeltaDataSource.parsePathIdentifier(spark, path.toString, options)
DeltaDataSource.parsePathIdentifier(spark, path.toString, pathBasedOptions)
PathInfo(rootPath, filters, timeTravel)
}
}

private lazy val pathBasedOptions: Map[String, String] = {
if (catalogTable.isDefined) {
options
} else {
DeltaCatalogClient.pathCredentialOptions(spark, path) ++ options
}
}

private def rootPath = pathInfo.rootPath

private def partitionFilters = pathInfo.partitionFilters
Expand Down Expand Up @@ -122,7 +130,7 @@ class DeltaTableV2 private(
}
fileSystemOptions ++ options
} else {
options
pathBasedOptions
}
DeltaLog.forTable(
spark,
Expand Down
Loading
Loading