[Spark] Route Delta loadTable through the Delta REST API#6796
Conversation
789bcb3 to
e648e8b
Compare
| override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { | ||
| super.initialize(name, options) | ||
| deltaCatalogClient = | ||
| UCDeltaCatalogClientImpl.fromCatalogOptionsIfEnabled(name, options, super.loadTable _) |
There was a problem hiding this comment.
we already have the if(deltaCatalogClient != null ) use deltaCatalogClient otherwise super.loadTable here https://github.com/delta-io/delta/pull/6796/changes#diff-b26a0d050166968a3e99afed704340a0820a1dc69137ee3fb354c86826367f9cR303-R307
then why do we still need the defaultFallbackLoadTable to fallback to super.loadTable ? do you think we can just remove this ?
There was a problem hiding this comment.
UCDeltaCatalogClientImpl
Here we cannot directly use the UCDeltaCatalogClientImpl right ? since we will need the class loader to dynamically load the class, so that we can decouple the UCDeltaCatalogClientImpl dependency and dummy implementation.
For my UCClient, I did like this: https://github.com/delta-io/delta/pull/6792/changes#diff-e713dd4904f1e14bc5a90fc8517d818c6aa12655799e935e6f7a020d950c47ceR293-R306
There was a problem hiding this comment.
nit: My intellij suggest me to simplify the super.loadTable _ as super.loadTable .
There was a problem hiding this comment.
defaultFallbackLoadTable is for when loadTable got an error from server because the table isn't a Managed Delta table. This is the only place the caller doesn't know the type of table so it has to try and fallback.
| case e: UnsupportedTableFormatException => | ||
| logInfo(log"Table ${MDC(DeltaLogKeys.TABLE_NAME, ident)} is not in Delta format; " + | ||
| log"falling back to the legacy catalog path. Cause: " + | ||
| log"${MDC(DeltaLogKeys.EXCEPTION, e.getMessage)}") | ||
| return fallbackLoadTable(ident) |
There was a problem hiding this comment.
no, that's not good, because we cannot identify the table format in advance,then we have to try the ucClient.loadTable firstly, and then fallback to use the default loadTable ?
Let's discuss this.
There was a problem hiding this comment.
Yes that's what this code does.
e648e8b to
1eac20a
Compare
| val restClient = UCDeltaTokenBasedRestClient.forStaticToken( | ||
| uri, | ||
| token, | ||
| appVersions, | ||
| renewCredEnabled, | ||
| credScopedFsEnabled, | ||
| hadoopConfSupplier) | ||
| new UCDeltaCatalogClientImpl(catalogName, restClient, sspEnabled, fallbackLoadTable) |
There was a problem hiding this comment.
Once this PR (#6792) get merged, you will need use the standard UCClientFactory trait to initialize your UCDeltaClient.
| val hadoopConfSupplier: Supplier[Configuration] = | ||
| () => SparkSession.getActiveSession | ||
| .map(_.sparkContext.hadoopConfiguration) | ||
| .getOrElse(new Configuration()) |
There was a problem hiding this comment.
oh, we have to seriously consider the hadoopConfSupplier, since I want to abstract the the approach to initialize UCDeltaTokenBasedRestClient via Map<String,String>, while you case seems like we need the hadoopConf to initialize the table cred props via the unitycatalog-hadoop.
Let's discuss.
1eac20a to
9818288
Compare
| val spark = SparkSession.active | ||
|
|
||
| /** Non-null when the catalog opted into the Delta REST API path via `deltaRestApi.enabled`. */ | ||
| private[catalog] var deltaCatalogClient: DeltaCatalogClient = null |
There was a problem hiding this comment.
nit: if this deltaCatalogClient is explicitly designed to be nullable, will it make more sense to define it as Option[DeltaCatalogClient] ?
| override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { | ||
| super.initialize(name, options) | ||
| deltaCatalogClient = | ||
| UCDeltaCatalogClientImpl.fromCatalogOptionsIfEnabled(name, options, super.loadTable _) |
There was a problem hiding this comment.
nit: My intellij suggest me to simplify the super.loadTable _ as super.loadTable .
| () => SparkSession.getActiveSession | ||
| .map(_.sparkContext.hadoopConfiguration) | ||
| .getOrElse(new Configuration()) | ||
| val restClient = UCDeltaTokenBasedRestClient.forStaticToken( |
There was a problem hiding this comment.
Hi @yili-db , this is incorrect. since we will need to support all general token providers:
- static unitycatalog token provider .
- OAuth token provider.
- Customize token provider - some external contributors will contribute and customize their token provider to get the token from federate oauth service.
so here we cannot limit to use the static token. and we have to use the general token auth config to initialize the TokenProvider here.
Just like the code here:
| } catch (IllegalArgumentException malformed) { | ||
| // UC Hadoop's response validator (DeltaStorageCredentialUtil.requireSingleCloudConfig) | ||
| // throws when the scheme has no cloud cred (e.g. file://). Treat as no creds. | ||
| return Collections.emptyMap(); |
There was a problem hiding this comment.
In the UCSingleCatalog, we have the code to fallback to use the server side planning https://github.com/unitycatalog/unitycatalog/blob/main/connectors/spark/src/main/scala/io/unitycatalog/spark/UCSingleCatalog.scala#L596-L602.
So my question is: how will your code affect the server side planning , once we switch to new catalog endpoint .
There was a problem hiding this comment.
The same logic has been added in this PR. Search for "enableServerSidePlanningConfig"
| /** Static-token factory; hides {@link TokenProvider} from callers. */ | ||
| public static UCDeltaTokenBasedRestClient forStaticToken( | ||
| String baseUri, | ||
| String token, | ||
| Map<String, String> appVersions, | ||
| boolean credentialRenewalEnabled, | ||
| boolean credentialScopedFsEnabled, | ||
| Supplier<Configuration> hadoopConfSupplier) { | ||
| Objects.requireNonNull(token, "token must not be null"); | ||
| Map<String, String> tokenConfigs = new LinkedHashMap<>(); | ||
| tokenConfigs.put("type", "static"); | ||
| tokenConfigs.put("token", token); | ||
| return new UCDeltaTokenBasedRestClient( | ||
| baseUri, | ||
| TokenProvider.create(tokenConfigs), | ||
| appVersions, | ||
| credentialRenewalEnabled, | ||
| credentialScopedFsEnabled, | ||
| hadoopConfSupplier); | ||
| } |
There was a problem hiding this comment.
we shouldn't use the static auth config to initialize the TokenProvider, since we already support both static, oauth, and customize token provider.
we will need to reuse the PR code to get a more general token provider, which won't perceive any specific auth config keys to initialize the general provider.
| // UC's loadTable response carries the UC table_uuid (exposed via TableInfo.getUcTableId), | ||
| // not the Delta Metadata.id. The Delta id only lives in the Delta log Metadata action and | ||
| // is never sent to UC; callers that need it must read the log. |
There was a problem hiding this comment.
I did not get why the m.getTableUuid is incorrect ? and if the m.getTableUuId is wrong, should we add the tableId in the constructor, so that here we can return a correct tableId to the upper layer, rather than giving a null ?
There was a problem hiding this comment.
m.getTableUuid is the internal table ID recorded in Delta commit log. This ID is different than UC table ID is is never sent to UC.
So this UC loadtable result has to return null for it.
| public static final class TableInfo { | ||
|
|
||
| private final String location; | ||
| private final String ucTableId; |
There was a problem hiding this comment.
I think we'd better to align with the StagingTableInfo's tableId, let's use the same field name, otherwise it will easily confuse the reviewer, and wonder what's the difference between the two .
There was a problem hiding this comment.
nit: pls also keep the same definition order as the StagingTableInfo.
| // Surface as a typed failure so callers with a fallback (e.g. server-side planning) can | ||
| // recover. The exception carries the catalog-side TableInfo (with empty storageProperties) | ||
| // so the caller can still build a CatalogTable. | ||
| TableInfo withoutCreds = new TableInfo( | ||
| location, ucTableId, tableType, adapted, Collections.emptyMap()); |
There was a problem hiding this comment.
This seems to be buggy. as I see the following code actually just throw the CredentialFetchFailedException , and the withoutCreds actually is never used. and the tricky thing is: TableInfo does not implement the toString, so the String.format actually will just print the java object id for it.
There was a problem hiding this comment.
And if you want to handle the fallback when serverPlanEnabled, then why just move the entire fallback logic inside the fetchTableCredentials ? will it make more sense to make the fetchTableCredentials more focused, and make here more clear ?
There was a problem hiding this comment.
withoutCreds is embedded in the exception returned to caller so that caller can continue with a tableInfo but without cred. And it's not a parameter of String.format.
fetchTableCredentials is about getting the Map<String,String> of credentials, while here it's more about "return the remaining part of tableInfo back to caller in the form of exception". So it can not be handled in fetchTableCredentials.
9818288 to
14937a2
Compare
| * Backend hook used by [[AbstractDeltaCatalog]] for catalog-specific table loading; isolates | ||
| * catalog-client and credential-vending plumbing from `AbstractDeltaCatalog`. | ||
| */ | ||
| private[catalog] trait DeltaCatalogClient { |
| private final TableType tableType; | ||
| private final String location; | ||
| private final AbstractMetadata metadata; | ||
| private final Map<String, String> storageProperties; |
There was a problem hiding this comment.
Hi @yili-db, do we also want to include UniForm metadata here? I can also do it in a follow-up PR if needed
There was a problem hiding this comment.
Yes UniForm can come as a follow-up.
dc1baf8 to
de1bb8d
Compare
8d69cf8 to
8fcaaf1
Compare
| assert(v1.identifier.table === "tbl") | ||
| assert(v1.identifier.database === Some("sch")) | ||
| assert(v1.identifier.catalog === Some("main")) | ||
| assert(v1.tableType === CatalogTableType.EXTERNAL) |
There was a problem hiding this comment.
Will the UCDeltaClient support to load the EXTERNAL TABLE ?
There was a problem hiding this comment.
Eventually it will. But at this stage server ins't ready to serve external tables yet so server would reject the request.
This is a stubbed test so it just wants to test any returned enum value.
| /** Returns a fixed [[TableInfo]] from {@code loadTable}; throws elsewhere. */ | ||
| private class StubUCDeltaClient(info: TableInfo) extends UCDeltaClient { | ||
| override def getMetastoreId(): String = throw new UnsupportedOperationException | ||
| override def loadTable(tableIdentifier: StorageTableIdentifier): TableInfo = info |
There was a problem hiding this comment.
Seems like you are using the StubUCDeltaClient to mock the UCDeltaClient, I think as we add more and more API, then it will become more and more harder for us to mock the method.
There was a problem hiding this comment.
That's the only way to do a mocked unit test. Each additional method would repeat the function signature and then a throw.
There was a problem hiding this comment.
Will it possible for us to follow the InMemoryUCClient to have our own InMemoryUCDeltaClient, with this, we can fully simulate any kinds of cases for testing purpose.
Essentially, it use InMemoryUCCommitCoordinator to maintain an in-memory map to keep all those alive table instances. perTableMap
There was a problem hiding this comment.
Eventually yes. We need to implement the entire table lifecycle with a InMemoryUCDeltaClient, then we can use it for testing purpose.
| oldMetadata: java.util.Optional[AbstractMetadata], | ||
| newMetadata: java.util.Optional[AbstractMetadata], | ||
| oldProtocol: java.util.Optional[io.delta.storage.commit.actions.AbstractProtocol], | ||
| newProtocol: java.util.Optional[io.delta.storage.commit.actions.AbstractProtocol], | ||
| uniform: java.util.Optional[io.delta.storage.commit.uniform.UniformMetadata]): Unit = |
There was a problem hiding this comment.
nit: I will suggest more to import those classes, rather than use the full qualified class name.
| private val noFallback: Identifier => Table = | ||
| _ => throw new UnsupportedOperationException("fallback not expected in this test") |
There was a problem hiding this comment.
do we already cover the UnsupportedTableFormatException case in UCDeltaCatalogClientImpl, so that even if it fallback to the fallbackLoadTable, it still can correctly load the table from the fallbackLoadTable, that's the most exceptional case I think.
There was a problem hiding this comment.
Added UCDeltaTableNonDeltaFallbackTest.java
| } catch { | ||
| case _: ClassNotFoundException => | ||
| logWarning(s"'$UC_DELTA_REST_API_ENABLED_KEY' is true but " + | ||
| s"$UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME is not on the classpath; skipping it.") |
There was a problem hiding this comment.
if we are failed to load the class based on the class name, will it be more straightforward to throw the exception, rather return a null, since the null AbstractDeltaCatalogClient , will always trigger to use the non delta rest catalog even if we explicitly configured the deltaRestApi.enabled=true.
openinx
left a comment
There was a problem hiding this comment.
[AI Generated] Left comments. Architecture is sound (trait + factory + reflective loading provides good decoupling). Main concerns: (1) mutable var=null, (2) test-only global counters in production, (3) side-effecting session config mutation in loadTable, (4) missing unit tests for fallback/exception paths.
| * storage-credential vending) rather than going through the Spark | ||
| * [[org.apache.spark.sql.connector.catalog.TableCatalog]] API. | ||
| */ | ||
| private[catalog] var deltaCatalogClient: AbstractDeltaCatalogClient = null |
There was a problem hiding this comment.
[AI Generated] Using var ... = null in Scala is an anti-pattern. As @openinx noted, this should be Option[AbstractDeltaCatalogClient]. Using Option makes nullability explicit at the type level, removes NPE risk, and is idiomatic Scala. The null-check in loadTable would become:
deltaCatalogClient.map(_.loadTable(ident)).getOrElse(super.loadTable(ident))|
|
||
| object UCDeltaCatalogClientImpl extends AbstractDeltaCatalogClientFactory with Logging { | ||
| /** Bumped at every loadTable entry, regardless of outcome. */ | ||
| val LOAD_TABLE_INVOCATIONS: AtomicLong = new AtomicLong(0L) |
There was a problem hiding this comment.
[AI Generated] These LOAD_TABLE_INVOCATIONS and SUCCESSFUL_DELTA_REST_API_LOADS are test-only instrumentation leaking into production code, coupling the implementation to test assertions in UCDeltaTableIntegrationBaseTest. Consider:
- Moving the counters behind a trait/mixin that only test subclasses activate, or
- Using Spark metrics/listeners rather than global mutable state.
If these must stay, at minimum make them private[delta] rather than fully public, and add a doc comment explaining they are test hooks.
There was a problem hiding this comment.
Made them private and only the get method is public.
| logWarning( | ||
| s"Credential fetch failed for ${fullQualifiedTableName(tid)}; enabling " + | ||
| s"server-side planning fallback. Cause: ${e.getMessage}") | ||
| enableServerSidePlanningConfig(ident) |
There was a problem hiding this comment.
[AI Generated] Two concerns here:
-
Side-effect at a distance:
spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "true")silently mutates global session state during aloadTablecall. This is surprising to callers and could affect unrelated tables loaded in the same session. Is this intentional? If SSP should only apply per-table, this approach is incorrect. -
Inconsistent logging:
logWarning(s"Credential fetch failed for ...")uses unstructured string interpolation. All other log calls in this class use the structuredlog"..." + MDC(...)pattern. Please align for consistency.
There was a problem hiding this comment.
This is the same behavior of UCSingleCatalog.
| val factory = try { | ||
| // scalastyle:off classforname | ||
| val cls = Class.forName(UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME + "$") | ||
| // scalastyle:on classforname |
There was a problem hiding this comment.
[AI Generated] Only ClassNotFoundException is caught here. If the class is present but is the wrong version (e.g., doesn't implement AbstractDeltaCatalogClientFactory), a ClassCastException or NoSuchFieldException will bubble up with no context. Consider catching a broader set or wrapping in a Try with a meaningful error message.
| val ns = ident.namespace() | ||
| require( | ||
| ns.length == 1, | ||
| s"UC identifiers must be of the form <schema>.<table>; got namespace ${ns.mkString(".")}") |
There was a problem hiding this comment.
[AI Generated] nit: The error message says "UC identifiers must be of the form <schema>.<table>" but doesn't include what was actually received. Consider including ns.length and the actual namespace in the message for debuggability:
require(ns.length == 1,
s"UC identifiers must be of the form <schema>.<table>; got namespace " +
s"of length ${ns.length}: ${ns.mkString(".")})")| assert(result != null) | ||
| } | ||
|
|
||
| private val noFallback: Identifier => Table = |
There was a problem hiding this comment.
[AI Generated] Missing test coverage for the most critical fallback paths:
- When
UCDeltaClient.loadTablethrowsUnsupportedTableFormatException, verifyfallbackLoadTableis correctly invoked and returns a valid table. - When
CredentialFetchFailedExceptionis thrown withserverSidePlanningEnabled=true, verify the SSP config is set and the table info without credentials is returned.
These are the most important exception paths in UCDeltaCatalogClientImpl.loadTable.
There was a problem hiding this comment.
UnsupportedTableFormatException: tested in UCDeltaTableNonDeltaFallbackTest.java
CredentialFetchFailedException: tested in loadTable falls back to SSP on CredentialFetchFailedException when SSP is enabled
| } | ||
|
|
||
| /** Returns a fixed [[TableInfo]] from {@code loadTable}; throws elsewhere. */ | ||
| private class StubUCDeltaClient(info: TableInfo) extends UCDeltaClient { |
There was a problem hiding this comment.
[AI Generated] nit: Every time a new method is added to UCDeltaClient, this stub needs updating. Consider a base abstract class with default throw new UnsupportedOperationException for all methods that tests selectively override, or using a mock library.
There was a problem hiding this comment.
Currently there is only one stub class.
| private long deltaRestApiLoadsAtClassStart; | ||
| private long loadTableInvocationsAtClassStart; | ||
|
|
||
| @BeforeAll |
There was a problem hiding this comment.
[AI Generated] nit: JUnit 5 @BeforeAll/@AfterAll requires either static methods or @TestInstance(Lifecycle.PER_CLASS). These are instance methods — please verify @TestInstance(PER_CLASS) is configured on this class or a parent, otherwise they won't be invoked as expected.
There was a problem hiding this comment.
Yes
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class UnityCatalogSupport {
388175d to
86ed825
Compare
|
|
||
| /** | ||
| * [[AbstractDeltaCatalogClient]] backed by a [[UCDeltaClient]]; translates between | ||
| * Spark/Delta types and the storage-side UC types. |
There was a problem hiding this comment.
define the fallbackLoadTable param
| fallbackLoadTable: Identifier => Table = UCDeltaCatalogClientImpl.defaultFallbackLoadTable) | ||
| extends AbstractDeltaCatalogClient with Logging { | ||
|
|
There was a problem hiding this comment.
| fallbackLoadTable: Identifier => Table = UCDeltaCatalogClientImpl.defaultFallbackLoadTable) | |
| extends AbstractDeltaCatalogClient with Logging { | |
| fallbackLoadTableFunc: Identifier => Table = UCDeltaCatalogClientImpl.defaultFallbackLoadTable) | |
| extends AbstractDeltaCatalogClient with Logging { | |
|
|
||
| /** Subclasses can override to false for A/B comparison with the legacy path. */ | ||
| protected boolean useDeltaRestApiForTests() { | ||
| return true; |
There was a problem hiding this comment.
Also separate out the tests that can't work without useDeltaRestApiForTests
There was a problem hiding this comment.
subclasss overrides twice.
tdas
left a comment
There was a problem hiding this comment.
high lgtm. top concerns
- we need to test both new api turned on and off. can be follow up work, but needs to be done
When `deltaRestApi.enabled` is set on a UC catalog, AbstractDeltaCatalog routes table loads through UCDeltaCatalogClientImpl, backed by UCDeltaTokenBasedRestClient. Non-Delta tables fall back to the legacy delegate. Signed-off-by: Yi Li <yi.li@databricks.com>
Signed-off-by: Yi Li <yi.li@databricks.com>
86ed825 to
3484af3
Compare
Signed-off-by: Yi Li <yi.li@databricks.com>
3484af3 to
32b44f9
Compare
| // scalastyle:off classforname | ||
| val cls = Class.forName(UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME + "$") | ||
| // scalastyle:on classforname | ||
| cls.getField("MODULE$").get(null).asInstanceOf[AbstractDeltaCatalogClientFactory] |
There was a problem hiding this comment.
What's the meaning of getField("MODULE$").get(null) ?
There was a problem hiding this comment.
I mean what's the meaning of the null here ? seems weird.
There was a problem hiding this comment.
Just reading the javadoc of the Field.java
/**
* Returns the value of the field represented by this {@code Field}, on
* the specified object. The value is automatically wrapped in an
* object if it has a primitive type.
*
* <p>The underlying field's value is obtained as follows:
*
* <p>If the underlying field is a static field, the {@code obj} argument
* is ignored; it may be null.
*
* <p>Otherwise, the underlying field is an instance field. If the
* specified {@code obj} argument is null, the method throws a
* {@code NullPointerException}. If the specified object is not an
* instance of the class or interface declaring the underlying
* field, the method throws an {@code IllegalArgumentException}.
*
* <p>If this {@code Field} object is enforcing Java language access control, and
* the underlying field is inaccessible, the method throws an
* {@code IllegalAccessException}.
* If the underlying field is static, the class that declared the
* field is initialized if it has not already been initialized.
*
* <p>Otherwise, the value is retrieved from the underlying instance
* or static field. If the field has a primitive type, the value
* is wrapped in an object before being returned, otherwise it is
* returned as is.
*
* <p>If the field is hidden in the type of {@code obj},
* the field's value is obtained according to the preceding rules.
*
* @param obj object from which the represented field's value is
* to be extracted
* @return the value of the represented field in object
* {@code obj}; primitive values are wrapped in an appropriate
* object before being returned
*
* @throws IllegalAccessException if this {@code Field} object
* is enforcing Java language access control and the underlying
* field is inaccessible.
* @throws IllegalArgumentException if the specified object is not an
* instance of the class or interface declaring the underlying
* field (or a subclass or implementor thereof).
* @throws NullPointerException if the specified object is null
* and the field is an instance field.
* @throws ExceptionInInitializerError if the initialization provoked
* by this method fails.
*/
@CallerSensitive
@ForceInline // to ensure Reflection.getCallerClass optimization
public Object get(Object obj)
throws IllegalArgumentException, IllegalAccessException
{
if (!override) {
Class<?> caller = Reflection.getCallerClass();
checkAccess(caller, obj);
return getFieldAccessor().get(obj);
} else {
return getOverrideFieldAccessor().get(obj);
}
}There was a problem hiding this comment.
Okay, because you are accessing the object UCDeltaCatalogClientImpl as a static class.
According to this key comment:
* <p>If the underlying field is a static field, the {@code obj} argument
* is ignored; it may be null.
*
* <p>Otherwise, the underlying field is an instance field. If the
* specified {@code obj} argument is null, the method throws a
* {@code NullPointerException}.| */ | ||
| def successfulDeltaRestApiLoadsForTesting: Long = successfulDeltaRestApiLoadsCounter.get() | ||
|
|
||
| private[catalog] val ServerSidePlanningEnabledKey: String = "serverSidePlanning.enabled" |
There was a problem hiding this comment.
Do we also have the tests that cover the serverSidePlanning is also working for the delta rest catalog ?
There was a problem hiding this comment.
Yes loadTable falls back to SSP on CredentialFetchFailedException when SSP is enabled and loadTable without serverSidePlanningEnabled rethrows CredentialFetchFailedException
| Seq( | ||
| UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY -> "true", | ||
| UCTokenBasedRestClientFactory.RENEW_CREDENTIAL_ENABLED_KEY -> "true", | ||
| UCTokenBasedRestClientFactory.CRED_SCOPED_FS_ENABLED_KEY -> "false" |
There was a problem hiding this comment.
wait, why the credScopedFs.enabled is set to false by default, I think starts from uc 0.5.0, we will have to set it to true , since we already have such tpc-ds benchmark, and covered that the credScopedFs indeed help us to save the spawn threads and connection pools, for different tables with different credentials.
There was a problem hiding this comment.
Because it was in UCSingleCatalog:
public static final String CRED_SCOPED_FS_ENABLED = "credScopedFs.enabled";
public static final boolean DEFAULT_CRED_SCOPED_FS_ENABLED = false;
There was a problem hiding this comment.
We have a PR aimed for uc 0.5.0: unitycatalog/unitycatalog#1479 to make this true by default.
|
|
||
| override def createUCClient(ucConfig: Map[String, String]): UCClient = { | ||
| val uri = ucConfig.getOrElse(URI_KEY, | ||
| override def createUCClient(ucConfig: java.util.Map[String, String]): UCClient = { |
There was a problem hiding this comment.
Why change this ucConfig to java.util.Map in the scala code ? seems like you want to call this createUCClient in the java code, that's why you change it as a Java-friendly interface, right ?
There was a problem hiding this comment.
In Spark the catalog plugin has CaseInsensitiveStringMap which is a Java map.
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
| /** Factory trait for creating [[UCClient]] instances from a unified configuration map. */ | ||
| trait UCClientFactory { | ||
| def createUCClient(ucConfig: Map[String, String]): UCClient | ||
| def createUCClient(ucConfig: java.util.Map[String, String]): UCClient |
There was a problem hiding this comment.
Hi @yili-db , I think the correct approach is, add a overrided
createUCClient(ucConfig: java.util.Map[String,String]): UCClient in this UCClientFactory interface, and delegate it to the scala native method def createUCClient(ucConfig: Map[String, String]): UCClient. with this, we don't need to change the upper layer code.
There was a problem hiding this comment.
The 2nd scala version createUCClient can not have a simple default implementation that calls the other one with asJava, because default impl can't be properly mocked in unit tests.
Then it has to be two createUCClient here (no default impl), and two createUCClient implemented in UCTokenBasedRestClientFactory with one calling another.
After all it doesn't seem to be much benefit.
| /** Opt-in: caller wants `UCDeltaTokenBasedRestClient` constructed with credential renewal. */ | ||
| final val RENEW_CREDENTIAL_ENABLED_KEY = "renewCredential.enabled" | ||
| /** Opt-in: caller wants `UCDeltaTokenBasedRestClient` constructed with cred-scoped FS. */ | ||
| final val CRED_SCOPED_FS_ENABLED_KEY = "credScopedFs.enabled" |
There was a problem hiding this comment.
This actually make me think more about the UCClient and UCDeltaClient initialization. the principle for us is: we should avoid to deduplicate the those catalog-level configs in the entire delta code base.
And the suggested approach for us is: make the entire UCClient and UCDeltaClient constructor to accept the ucConfig, and we should hide the catalog-level configs initialization inside the UCClient.
| java.lang.Boolean.TYPE, | ||
| java.lang.Boolean.TYPE, | ||
| classOf[Supplier[_]]) |
There was a problem hiding this comment.
This will requires the UCTokenBasedRestClient also add the unecessary renewCred, credScopedFs, hadoopConfSupplier parameters in its constructor, which actually have meaningless in the UCTokenBasedRestClient.
There was a problem hiding this comment.
Yes. That's the cost of unifying the constructor.
openinx
left a comment
There was a problem hiding this comment.
Generally looks good to me, but there is a following-up PR to address the comments:
- Avoid the
UCClientFactoryto perceive the concrete catalog configs, currently,UCDeltaCatalogClientImpl,UCClientFactory,UCTokenBaseRestClient,UCDeltaRestBaseRestClient, all need to parse and perceive the configs. Otherwise, it's every easy to make us produce buggy code, once we forget one of the classes. credScopedFs.enabled, this flag will need to set to be true. as uc 0.5.0 will set it to true by default.
🥞 Stacked PR
Use this link to review incremental changes.
Which Delta project/connector is this regarding?
Description
[Spark] Route Delta loadTable through the Delta REST API
When
deltaRestApi.enabledis set on a UC catalog, AbstractDeltaCatalog routes table loads through UCDeltaCatalogClientImpl, backed by UCDeltaTokenBasedRestClient. Non-Delta tables fall back to the legacy delegate.How was this patch tested?
Existing integration tests.
Does this PR introduce any user-facing changes?
No.