Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,23 @@ public List<Shard> getCassandraShard(String cassandraConfigFilePath) {
LOG.info("Reading Cassandra configuration from: {}", cassandraConfigFilePath);
OptionsMap optionsMap =
CassandraDriverConfigLoader.getOptionsMapFromFile(cassandraConfigFilePath);
CassandraShard shard = new CassandraShard(optionsMap);
LOG.info("Successfully created CassandraShard: {}", shard);
return Collections.singletonList(shard);
return getCassandraShard(optionsMap);
} catch (FileNotFoundException e) {
throw new IllegalArgumentException(
"Configuration file not found: " + cassandraConfigFilePath, e);
}
}

/**
* Reads the Cassandra configuration file from the specified GCS path and converts it into a list
* of CassandraShard objects.
*
* @param cassandraConfigFilePath the GCS path of the Cassandra configuration file.
* @return a list containing the parsed CassandraShard.
*/
public List<Shard> getCassandraShard(OptionsMap optionsMap) {
CassandraShard shard = new CassandraShard(optionsMap);
LOG.info("Successfully created CassandraShard: {}", shard);
return Collections.singletonList(shard);
}
}
42 changes: 22 additions & 20 deletions v2/spanner-to-sourcedb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ A few prerequisites must be considered before starting with reverse replication.
8. Ensure that that [session file](https://googlecloudplatform.github.io/spanner-migration-tool/reports.html#session-file-ending-in-sessionjson) is uploaded to GCS (this requires a schema conversion to be done).
9. Configuration Files Upload
- **For MySQL:**
[Source shards file](./RunnigReverseReplication.md#sample-source-shards-file-for-MySQL) already uploaded to GCS.
[Source shards file](#sample-source-shards-file-for-MySQL) already uploaded to GCS.
- **For Cassandra:**
[Source file](./RunnigReverseReplication.md#Sample-source-File-for-Cassandra) already uploaded to GCS.
[Source file](#sample-source-file-for-Cassandra) already uploaded to GCS.
10. Resources needed for reverse replication incur cost. Make sure to read [cost](#cost).
11. Reverse replication uses shard identifier column per table to route the Spanner records to a given source shard.The column identified as the sharding column needs to be selected via Spanner Migration Tool when performing migration.The value of this column should be the logicalShardId value specified in the [source shard file](#sample-source-shards-file-for-MySQL).In the event that the shard identifier column is not an existing column,the application code needs to be changed to populate this shard identifier column when writing to Spanner. Or use a custom shard identifier plugin to supply the shard identifier. In case of single shard migrations, this step is skipped.
12. The reverse replication pipeline uses GCS for dead letter queue handling. Ensure that the DLQ directory exists in GCS.
Expand Down Expand Up @@ -141,24 +141,26 @@ The database user password should be kept in [Secret Manager](#https://cloud.goo
The file should be a list of JSONs as:

```json
[
{
"logicalShardId": "shard1",
"host": "10.11.12.13",
"user": "root",
"secretManagerUri":"projects/123/secrets/rev-cmek-cred-shard1/versions/latest",
"port": "3306",
"dbName": "db1"
},
{
"logicalShardId": "shard2",
"host": "10.11.12.14",
"user": "root",
"secretManagerUri":"projects/123/secrets/rev-cmek-cred-shard2/versions/latest",
"port": "3306",
"dbName": "db2"
}
]
{
"shardConfigs": [
{
"logicalShardId": "shard1",
"host": "10.11.12.13",
"user": "root",
"secretManagerUri": "projects/123/secrets/rev-cmek-cred-shard1/versions/latest",
"port": "3306",
"dbName": "db1"
},
{
"logicalShardId": "shard2",
"host": "10.11.12.14",
"user": "root",
"secretManagerUri": "projects/123/secrets/rev-cmek-cred-shard2/versions/latest",
"port": "3306",
"dbName": "db2"
}
]
}
```


Expand Down
4 changes: 4 additions & 0 deletions v2/spanner-to-sourcedb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
</exclusion>
<exclusion>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this excluded ? Is it a change in beam ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this exclusion, there is a version conflict with:
org.apache.cassandra
java-driver-core
${cassandra-java-driver-core.version}

This dependency is test scoped. It was not causing any issue earlier as we didn't have tests which was using this package.

<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.CASSANDRA_SOURCE_TYPE;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.MYSQL_SOURCE_TYPE;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.POSTGRES_SOURCE_TYPE;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.RUN_MODE_REGULAR;
Expand All @@ -38,12 +39,16 @@
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.CassandraConnectionConfig;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConfigParser;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConnectionConfig;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraConfigFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ISecretManagerAccessor;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
import com.google.cloud.teleport.v2.spanner.sourceddl.CassandraInformationSchemaScanner;
import com.google.cloud.teleport.v2.spanner.sourceddl.MySqlInformationSchemaScanner;
import com.google.cloud.teleport.v2.spanner.sourceddl.PostgreSQLInformationSchemaScanner;
Expand All @@ -61,6 +66,7 @@
import com.google.cloud.teleport.v2.templates.transforms.UpdateDlqMetricsFn;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
Expand Down Expand Up @@ -647,20 +653,14 @@ public static PipelineResult run(Options options) {
.get(SpannerInformationSchemaProcessorTransform.SHADOW_TABLE_DDL_TAG)
.apply("View Shadow DDL", View.asSingleton());

List<Shard> shards;
String shardingMode;
if (MYSQL_SOURCE_TYPE.equals(options.getSourceType())
|| POSTGRES_SOURCE_TYPE.equals(options.getSourceType())) {
ShardFileReader shardFileReader = new ShardFileReader(new SecretManagerAccessorImpl());
shards = shardFileReader.getOrderedShardDetails(options.getSourceShardsFilePath());
shardingMode = Constants.SHARDING_MODE_MULTI_SHARD;
List<Shard> shards = getShardList(options.getSourceType(), options.getSourceShardsFilePath());

} else {
CassandraConfigFileReader cassandraConfigFileReader = new CassandraConfigFileReader();
shards = cassandraConfigFileReader.getCassandraShard(options.getSourceShardsFilePath());
LOG.info("Cassandra config is: {}", shards.get(0));
shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
}
// cassandra is always a single sharded migration.
// for JDBC, shards size and IsShardedMigration option is used below.
String shardingMode =
options.getSourceType().equals(CASSANDRA_SOURCE_TYPE)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be moved to the respective config parsing flow rather than having it as a top level decision

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the non-sharded workflow work here ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shardingMode: this is used in AssignShardIdFn to identify if default shard id is needed to be assigned to stream events.

I could have used the size of shards list to identify shardingMode.

The primary concern in doing so was the logic below (at line 672) where this is also dependent on the user input getIsShardedMigration.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can AND that flag here to get to the output. In any case, not a blocker to letting this through. Lets take it up as a follow up

? Constants.SHARDING_MODE_SINGLE_SHARD
: Constants.SHARDING_MODE_MULTI_SHARD;

if (MYSQL_SOURCE_TYPE.equals(options.getSourceType())) {
validateMySQLNotReadOnly(shards);
Expand Down Expand Up @@ -951,6 +951,53 @@ static void buildPipeline(
.build());
}

/**
* Returns a list of shards based on the source type and source shards file path. This should be
* removed in Phase 2 of Standardizing config.
*
* @param sourceType The type of the source database.
* @param sourceShardsFilePath The GCS path to the source shards configuration file.
* @return A list of shards.
*/
public static List<Shard> getShardList(String sourceType, String sourceShardsFilePath) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally this method should move to spanner-common

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic below is specific to Reverse replication. There is no synergy on this between reverse and bulk.

Also, the plan is to remove this in phase 2. I have added a comment to reflect this.

ISecretManagerAccessor secretManagerAccessor = new SecretManagerAccessorImpl();
SourceConfigParser sourceConfigParser = new SourceConfigParser(secretManagerAccessor);
SourceConnectionConfig sourceConnectionConfig;
try {
// Parse the source shards configuration file to respective
// SourceConnectionConfig.
sourceConnectionConfig =
sourceConfigParser.parseConfiguration(sourceType, sourceShardsFilePath);
} catch (Exception e) {
LOG.error("Error parsing source config", e);
throw new RuntimeException("Error parsing source config", e);
}
List<Shard> shards;
if (sourceConnectionConfig instanceof JdbcShardConfig) {
shards = ((JdbcShardConfig) sourceConnectionConfig).getShardConfigs();
LOG.info("JDBC shard config is parsed.");
} else if (sourceConnectionConfig instanceof CassandraConnectionConfig) {
CassandraConfigFileReader cassandraConfigFileReader = new CassandraConfigFileReader();
shards =
cassandraConfigFileReader.getCassandraShard(
((CassandraConnectionConfig) sourceConnectionConfig).getOptionsMap());
LOG.info("Cassandra shard config is parsed.");
} else {
String errorMessage =
"Invalid source config for source type: "
+ sourceType
+ ". Source config parsed to: "
+ sourceConnectionConfig.getClass()
+ ". Source config file path: "
+ sourceShardsFilePath;
LOG.error(errorMessage);
throw new RuntimeException(errorMessage);
}
Preconditions.checkArgument(
shards != null && !shards.isEmpty(), "Shard list should have at least 1 element.");
return shards;
}

public static SpannerIO.ReadChangeStream getReadChangeStreamDoFn(
Options options, SpannerConfig spannerConfig) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@

import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
Expand Down Expand Up @@ -105,7 +101,11 @@ public void setUp() throws IOException, InterruptedException {
gcsResourceManager = setUpSpannerITGcsResourceManager();

// Use generic multi-shard logic instead of base IT helper
createAndUploadShardConfigToGcsMulti();
createAndUploadShardConfigToGcs(
gcsResourceManager,
Map.of(
"testShardA", jdbcResourceManagerShardA, "testShardB", jdbcResourceManagerShardB));
;

// Upload overrides file
gcsResourceManager.uploadArtifact(
Expand Down Expand Up @@ -394,35 +394,6 @@ private Integer getIntValueCaseInsensitive(Map<String, Object> map, String key)
return null;
}

private void createAndUploadShardConfigToGcsMulti() throws IOException {
Shard shardA = new Shard();
shardA.setLogicalShardId("testShardA");
shardA.setUser(jdbcResourceManagerShardA.getUsername());
shardA.setHost(jdbcResourceManagerShardA.getHost());
shardA.setPassword(jdbcResourceManagerShardA.getPassword());
shardA.setPort(String.valueOf(jdbcResourceManagerShardA.getPort()));
shardA.setDbName(jdbcResourceManagerShardA.getDatabaseName());
JsonObject jsObjA = (JsonObject) new Gson().toJsonTree(shardA).getAsJsonObject();
jsObjA.remove("secretManagerUri"); // remove field secretManagerUri

Shard shardB = new Shard();
shardB.setLogicalShardId("testShardB");
shardB.setUser(jdbcResourceManagerShardB.getUsername());
shardB.setHost(jdbcResourceManagerShardB.getHost());
shardB.setPassword(jdbcResourceManagerShardB.getPassword());
shardB.setPort(String.valueOf(jdbcResourceManagerShardB.getPort()));
shardB.setDbName(jdbcResourceManagerShardB.getDatabaseName());
JsonObject jsObjB = (JsonObject) new Gson().toJsonTree(shardB).getAsJsonObject();
jsObjB.remove("secretManagerUri"); // remove field secretManagerUri

JsonArray ja = new JsonArray();
ja.add(jsObjA);
ja.add(jsObjB);
String shardFileContents = ja.toString();
LOG.info("Shard file contents: {}", shardFileContents);
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
}

private void insertDataInSpanner() {
com.google.cloud.spanner.Mutation customer1 =
com.google.cloud.spanner.Mutation.newInsertOrUpdateBuilder("Customers")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@

import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.pubsub.v1.SubscriptionName;
import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -108,7 +104,10 @@ public void setUp() throws IOException, InterruptedException {
SpannerToSourceDBShardedMySQLRetryDLQIT.MYSQL_SCHEMA_FILE_RESOURCE);

gcsResourceManager = setUpSpannerITGcsResourceManager();
createAndUploadShardConfigToGcsMulti();
createAndUploadShardConfigToGcs(
gcsResourceManager,
Map.of(
"testShardA", jdbcResourceManagerShardA, "testShardB", jdbcResourceManagerShardB));

// Upload session file
gcsResourceManager.uploadArtifact(
Expand Down Expand Up @@ -374,35 +373,6 @@ private Integer getIntValueCaseInsensitive(Map<String, Object> map, String key)
return null;
}

private void createAndUploadShardConfigToGcsMulti() throws IOException {
Shard shardA = new Shard();
shardA.setLogicalShardId("testShardA");
shardA.setUser(jdbcResourceManagerShardA.getUsername());
shardA.setHost(jdbcResourceManagerShardA.getHost());
shardA.setPassword(jdbcResourceManagerShardA.getPassword());
shardA.setPort(String.valueOf(jdbcResourceManagerShardA.getPort()));
shardA.setDbName(jdbcResourceManagerShardA.getDatabaseName());
JsonObject jsObjA = (JsonObject) new Gson().toJsonTree(shardA).getAsJsonObject();
jsObjA.remove("secretManagerUri");

Shard shardB = new Shard();
shardB.setLogicalShardId("testShardB");
shardB.setUser(jdbcResourceManagerShardB.getUsername());
shardB.setHost(jdbcResourceManagerShardB.getHost());
shardB.setPassword(jdbcResourceManagerShardB.getPassword());
shardB.setPort(String.valueOf(jdbcResourceManagerShardB.getPort()));
shardB.setDbName(jdbcResourceManagerShardB.getDatabaseName());
JsonObject jsObjB = (JsonObject) new Gson().toJsonTree(shardB).getAsJsonObject();
jsObjB.remove("secretManagerUri");

JsonArray ja = new JsonArray();
ja.add(jsObjA);
ja.add(jsObjB);
String shardFileContents = ja.toString();
LOG.info("Shard file contents: {}", shardFileContents);
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
}

private void insertDataInSpanner() {
com.google.cloud.spanner.Mutation customer1 =
com.google.cloud.spanner.Mutation.newInsertOrUpdateBuilder("Customers")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.pubsub.v1.SubscriptionName;
import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -103,7 +99,10 @@ public void setUp() throws IOException, InterruptedException {
gcsResourceManager = setUpSpannerITGcsResourceManager();
createAndUploadJarToGcs(gcsResourceManager);

createAndUploadShardConfigToGcs();
createAndUploadShardConfigToGcs(
gcsResourceManager,
Map.of(
"testShardA", jdbcResourceManagerShardA, "testShardB", jdbcResourceManagerShardB));
gcsResourceManager.uploadArtifact(
"input/session.json",
Resources.getResource(SpannerToSourceDbCustomShardIT.SESSION_FILE_RESOURCE).getPath());
Expand Down Expand Up @@ -211,32 +210,4 @@ private void writeSpannerDataForSingers(int singerId, String firstName, String s
.build();
spannerResourceManager.write(m);
}

private void createAndUploadShardConfigToGcs() throws IOException {
Shard shard = new Shard();
shard.setLogicalShardId("testShardA");
shard.setUser(jdbcResourceManagerShardA.getUsername());
shard.setHost(jdbcResourceManagerShardA.getHost());
shard.setPassword(jdbcResourceManagerShardA.getPassword());
shard.setPort(String.valueOf(jdbcResourceManagerShardA.getPort()));
shard.setDbName(jdbcResourceManagerShardA.getDatabaseName());
JsonObject jsObj = (JsonObject) new Gson().toJsonTree(shard).getAsJsonObject();
jsObj.remove("secretManagerUri"); // remove field secretManagerUri

Shard shardB = new Shard();
shardB.setLogicalShardId("testShardB");
shardB.setUser(jdbcResourceManagerShardB.getUsername());
shardB.setHost(jdbcResourceManagerShardB.getHost());
shardB.setPassword(jdbcResourceManagerShardB.getPassword());
shardB.setPort(String.valueOf(jdbcResourceManagerShardB.getPort()));
shardB.setDbName(jdbcResourceManagerShardB.getDatabaseName());
JsonObject jsObjB = (JsonObject) new Gson().toJsonTree(shardB).getAsJsonObject();
jsObjB.remove("secretManagerUri"); // remove field secretManagerUri
JsonArray ja = new JsonArray();
ja.add(jsObj);
ja.add(jsObjB);
String shardFileContents = ja.toString();
LOG.info("Shard file contents: {}", shardFileContents);
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
}
}
Loading
Loading