feat(Spanner): integrate SourceConfigParser to centralize shard configuration loading for SpannerToSourceDb pipelines#3840
Conversation
aeaefa2 to
ca0311c
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3840 +/- ##
=============================================
+ Coverage 39.94% 55.47% +15.52%
- Complexity 684 6509 +5825
=============================================
Files 208 1102 +894
Lines 14902 67463 +52561
Branches 1528 7567 +6039
=============================================
+ Hits 5953 37425 +31472
- Misses 8451 27591 +19140
- Partials 498 2447 +1949
🚀 New features to boost your workflow:
|
ca0311c to
87ab9eb
Compare
7a2cdc1 to
6436182
Compare
| ResultSet mockRs = mock(ResultSet.class); | ||
|
|
||
| when(mockConn.createStatement()).thenReturn(mockStmt); | ||
| when(mockStmt.executeQuery("SHOW VARIABLES LIKE 'read_only'")).thenReturn(mockRs); |
There was a problem hiding this comment.
It's removed as mokito logs show that these mocks are not used in the test.
08bd358 to
213e4aa
Compare
|
Warning Gemini encountered an error creating the summary. You can try again by commenting |
| // cassandra is always a single sharded migration. | ||
| // for JDBC, shards size and IsShardedMigration option is used below. | ||
| String shardingMode = | ||
| options.getSourceType().equals(CASSANDRA_SOURCE_TYPE) |
There was a problem hiding this comment.
This should be moved to the respective config parsing flow rather than having it as a top level decision
There was a problem hiding this comment.
How does the non-sharded workflow work here ?
There was a problem hiding this comment.
shardingMode: this is used in AssignShardIdFn to identify if default shard id is needed to be assigned to stream events.
I could have used the size of shards list to identify shardingMode.
The primary concern in doing so was the logic below (at line 672) where this is also dependent on the user input getIsShardedMigration.
There was a problem hiding this comment.
We can AND that flag here to get to the output. In any case, not a blocker to letting this through. Lets take it up as a follow up
| List<Shard> shards; | ||
| if (sourceConnectionConfig instanceof JdbcShardConfig) { | ||
| shards = ((JdbcShardConfig) sourceConnectionConfig).getShardConfigs(); | ||
| LOG.info("JDBC config is: {}", shards); |
There was a problem hiding this comment.
do not log this, this might have PII
| shards = | ||
| cassandraConfigFileReader.getCassandraShard( | ||
| ((CassandraConnectionConfig) sourceConnectionConfig).getOptionsMap()); | ||
| LOG.info("Cassandra config is: {}", shards.get(0)); |
There was a problem hiding this comment.
Do not log. It might have PII
| <groupId>com.datastax.cassandra</groupId> | ||
| <artifactId>cassandra-driver-core</artifactId> | ||
| </exclusion> | ||
| <exclusion> |
There was a problem hiding this comment.
why is this excluded ? Is it a change in beam ?
There was a problem hiding this comment.
Without this exclusion, there is a version conflict with:
org.apache.cassandra
java-driver-core
${cassandra-java-driver-core.version}
This dependency is test scoped. It was not causing any issue earlier as we didn't have tests which was using this package.
| 9. Configuration Files Upload | ||
| - **For MySQL:** | ||
| [Source shards file](./RunnigReverseReplication.md#sample-source-shards-file-for-MySQL) already uploaded to GCS. | ||
| [source shards file](#sample-source-shards-file-for-MySQL) already uploaded to GCS. |
There was a problem hiding this comment.
Please keep the capitalization here for uniformity
| * @param sourceShardsFilePath The GCS path to the source shards configuration file. | ||
| * @return A list of shards. | ||
| */ | ||
| public static List<Shard> getShardList(String sourceType, String sourceShardsFilePath) { |
There was a problem hiding this comment.
ideally this method should move to spanner-common
There was a problem hiding this comment.
The logic below is specific to Reverse replication. There is no synergy on this between reverse and bulk.
Also, the plan is to remove this in phase 2. I have added a comment to reflect this.
f52391a to
f6ce311
Compare
…guration loading for SpannerToSourceDb pipelines
f6ce311 to
9014e52
Compare
bharadwaj-aditya
left a comment
There was a problem hiding this comment.
There is some follow up work here. But marking as LGTM as those items can be taken up in subsequent PRS
| // cassandra is always a single sharded migration. | ||
| // for JDBC, shards size and IsShardedMigration option is used below. | ||
| String shardingMode = | ||
| options.getSourceType().equals(CASSANDRA_SOURCE_TYPE) |
There was a problem hiding this comment.
We can AND that flag here to get to the output. In any case, not a blocker to letting this through. Lets take it up as a follow up
No description provided.