Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 53 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,22 @@ lazy val sparkV1 = (project in file("spark"))
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided",
// For DynamoDBCommitStore
"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided",

"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided"
) ++ {
if (unityCatalogVersion >= "0.5.0") {
Seq(
"io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll(
ExclusionRule(organization = "org.openapitools"),
ExclusionRule(organization = "com.fasterxml.jackson.core"),
ExclusionRule(organization = "com.fasterxml.jackson.module"),
ExclusionRule(organization = "com.fasterxml.jackson.datatype"),
ExclusionRule(organization = "com.fasterxml.jackson.dataformat")
)
)
} else {
Seq.empty
}
} ++ Seq(
// Test deps
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test",
Expand Down Expand Up @@ -660,16 +674,40 @@ lazy val spark = (project in file("spark-unified"))
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided",
"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided",

"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided"
) ++ {
if (unityCatalogVersion >= "0.5.0") {
Seq(
"io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll(
ExclusionRule(organization = "org.openapitools"),
ExclusionRule(organization = "com.fasterxml.jackson.core"),
ExclusionRule(organization = "com.fasterxml.jackson.module"),
ExclusionRule(organization = "com.fasterxml.jackson.datatype"),
ExclusionRule(organization = "com.fasterxml.jackson.dataformat")
)
)
} else {
Seq.empty
}
} ++ Seq(
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test",
"junit" % "junit" % "4.13.2" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests"
) ++ {
if (unityCatalogVersion >= "0.5.0") {
Seq(
// unitycatalog-hadoop references the ABFS token-provider interface during classloading.
"org.apache.hadoop" % "hadoop-azure" % hadoopVersion % "test"
)
} else {
Seq.empty
}
} ++ Seq(
"org.mockito" % "mockito-inline" % "4.11.0" % "test",
),

Expand Down Expand Up @@ -843,13 +881,19 @@ Global / ensurePinnedUnityCatalog := {
val home = file(sys.props("user.home"))
// Check both layouts: a restored sbt cache can pre-populate ivy alone, leaving m2 empty -
// checking only ivy would silently skip the slow publish and break mvn-based consumers.
val ivy2Canary = home / ".ivy2" / "local" / "io.unitycatalog" /
val ivy2ClientCanary = home / ".ivy2" / "local" / "io.unitycatalog" /
"unitycatalog-client" / unityCatalogVersion / "ivys" / "ivy.xml"
val m2Canary = home / ".m2" / "repository" / "io" / "unitycatalog" /
val m2ClientCanary = home / ".m2" / "repository" / "io" / "unitycatalog" /
"unitycatalog-client" / unityCatalogVersion /
s"unitycatalog-client-$unityCatalogVersion.pom"
if (!ivy2Canary.exists || !m2Canary.exists) {
publishPinnedUnityCatalog(log, ivy2Canary)
val ivy2HadoopCanary = home / ".ivy2" / "local" / "io.unitycatalog" /
"unitycatalog-hadoop" / unityCatalogVersion / "ivys" / "ivy.xml"
val m2HadoopCanary = home / ".m2" / "repository" / "io" / "unitycatalog" /
"unitycatalog-hadoop" / unityCatalogVersion /
s"unitycatalog-hadoop-$unityCatalogVersion.pom"
if (!Seq(ivy2ClientCanary, m2ClientCanary, ivy2HadoopCanary, m2HadoopCanary)
.forall(_.exists)) {
publishPinnedUnityCatalog(log, ivy2ClientCanary)
}
}
}
Expand Down
22 changes: 12 additions & 10 deletions project/scripts/setup_unitycatalog_main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ if [[ "${1:-}" == "--print-version" ]]; then
exit 0
fi

# Canonical Ivy + Maven artifact paths. Delta depends on all three UC modules; sbt resolves from
# Canonical Ivy + Maven artifact paths. Delta depends on these UC modules; sbt resolves from
# ~/.ivy2/local, mvn (kernel-examples integration tests) resolves from ~/.m2/repository. If any
# is missing in either layout we must re-publish.
IVY_LOCAL="$HOME/.ivy2/local/io.unitycatalog"
Expand Down Expand Up @@ -118,24 +118,26 @@ if [[ "$UC_FORCE" != "1" ]] && all_canaries_present; then
exit 0
fi

echo ">>> Fetching Unity Catalog main from $UC_REPO"
echo ">>> Fetching Unity Catalog from $UC_REPO"
rm -rf "$UC_DIR"
mkdir -p "$UC_DIR"
# Fetch main's full history so we can run `git merge-base --is-ancestor` below to verify the
# pinned SHA is actually on main. UC's repo is small; full fetch of one branch is cheap.
# Fetch the target branch so we can verify the pinned SHA is reachable.
git -C "$UC_DIR" init --quiet
git -C "$UC_DIR" remote add origin "$UC_REPO"
git -C "$UC_DIR" fetch --quiet origin main
if [[ "$UC_REF" == "main" ]]; then
git -C "$UC_DIR" fetch --quiet origin main
else
git -C "$UC_DIR" fetch --quiet origin "$UC_PIN_SHA"
fi

cd "$UC_DIR"

# Safety check: the pinned SHA must be reachable from UC main. Local `merge-base --is-ancestor`
# Safety check: the pinned SHA must be reachable from the fetched branch. Local `merge-base --is-ancestor`
# on the history we just fetched - no GitHub API, no token needed. Only applies when UC_REF is
# the pinned SHA; UC_REF=main is trivially on main.
if [[ "$UC_REF" == "$UC_PIN_SHA" ]]; then
if ! git merge-base --is-ancestor "$UC_PIN_SHA" origin/main 2>/dev/null; then
echo "ERROR: UC_PIN_SHA=$UC_PIN_SHA is not reachable from unitycatalog/unitycatalog main." >&2
echo " Pin must reference a commit on https://github.com/unitycatalog/unitycatalog/commits/main" >&2
if ! git rev-parse --verify "$UC_PIN_SHA^{commit}" >/dev/null 2>&1; then
echo "ERROR: UC_PIN_SHA=$UC_PIN_SHA could not be fetched from $UC_REPO." >&2
exit 1
fi
fi
Expand All @@ -161,7 +163,7 @@ fi
# coordinate. Applied as a persistent setting so it sticks across the two sbt invocations below.
SET_VERSION_CMD="set ThisBuild / version := \"$UC_VERSION\""

echo ">>> Building and publishing UC client + server to local Maven repo"
echo ">>> Building and publishing UC client + server + hadoop to local Maven repo"
./build/sbt \
"$SET_VERSION_CMD" \
"set client / Compile / packageDoc / publishArtifact := false" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension

val spark = SparkSession.active

private lazy val deltaCatalogClient: Option[DeltaCatalogClient] =
Some(UCDeltaCatalogClient(delegate, spark))

private lazy val isUnityCatalog: Boolean = {
val delegateField = classOf[DelegatingCatalogExtension].getDeclaredField("delegate")
delegateField.setAccessible(true)
Expand Down Expand Up @@ -290,7 +293,7 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
"DeltaCatalog", "loadTable") {
setVariantBlockingConfigIfUC()
try {
val table = super.loadTable(ident)
val table = deltaCatalogClient.flatMap(_.loadTable(ident)).getOrElse(super.loadTable(ident))

ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt =>
return sspt
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog

import org.apache.spark.sql.connector.catalog.{Identifier, Table}

/**
* Spark-facing Delta catalog API hook.
*
* <p>The interface is intentionally free of UC SDK and Hadoop credential dependencies so the shared
* catalog path does not depend on a specific UC client implementation.
*/
private[catalog] trait DeltaCatalogClient {
def loadTable(ident: Identifier): Option[Table]
}
Loading
Loading